From 4bcfaf435de21744f04b44c3579c887a362e207c Mon Sep 17 00:00:00 2001 From: yacut Date: Sat, 6 Mar 2021 11:02:18 +0100 Subject: [PATCH] add cluster alarms add run once option add custom config option --- CHANGELOG.md | 7 ++ README.md | 1 + cmd/flag.go | 112 +++------------------------- cmd/main.go | 39 ++-------- config.yaml | 9 +++ pkg/config/factory.go | 58 +++++++++++++++ pkg/config/load.go | 114 ++++++++++++++++++++++++++++ pkg/config/main.go | 19 ++++- pkg/config/validate.go | 53 +++++++++++++ pkg/incident/main.go | 14 ++++ pkg/watcher/cluster.go | 100 +++++++++++++++++++++++++ pkg/watcher/main.go | 55 ++++++++++++-- pkg/watcher/node.go | 97 ++++++++++++++++++++++++ pkg/watcher/node_checker.go | 94 ++--------------------- pkg/watcher/node_informer.go | 22 +----- pkg/watcher/pod.go | 140 +++++++++++++++++++++++++++++++++++ pkg/watcher/pod_checker.go | 103 ++------------------------ pkg/watcher/pod_informer.go | 48 +----------- version.go | 2 +- 19 files changed, 696 insertions(+), 391 deletions(-) create mode 100644 pkg/config/factory.go create mode 100644 pkg/config/load.go create mode 100644 pkg/config/validate.go create mode 100644 pkg/watcher/cluster.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8be704c..7803f22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## v1.6.0 / 2021-03-05 + +- [FEATURE] Add cluster alarms +- [FEATURE] Add serverless support via `watcher.RunOnce(...)` +- [FEATURE] Add option to run checks only once via `--run-once` flag +- [ENHANCEMENT] Add insecure option for the kubernetes API server + ## v1.5.0 / 2021-03-02 - [ENHANCEMENT] Split CPU and memory resource config for better configuration opportunities diff --git a/README.md b/README.md index 7903b69..05954fb 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ server and generates incidents about the health state of the pods and the nodes. Simply build and run ilert-kube-agent to get Kubernetes cluster alarms. | Flag | Description | | ----------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `--alarms.cluster.enabled` | Enables cluster alarms. Triggers an alarm if any cluster problem occurred e.g. API server not available [Default: true] | | `--alarms.pods.terminate.enabled` | Enables terminate pod alarms. Triggers an alarm if any pod terminated e.g. Terminated, OOMKilled, Error, ContainerCannotRun, DeadlineExceeded [Default: true] | | `--alarms.pods.waiting.enabled` | Enables waiting pod alarms. Triggers an alarm if any pod in waiting status e.g. CrashLoopBackOff, ErrImagePull, ImagePullBackOff, CreateContainerConfigError, InvalidImageName, CreateContainerError [Default: true] | | `--alarms.pods.restarts.enabled` | Enables restarts pod alarms. Triggers an alarm if any pod restarts count reached threshold [Default: true] | diff --git a/cmd/flag.go b/cmd/flag.go index bcdbda2..4fa6021 100644 --- a/cmd/flag.go +++ b/cmd/flag.go @@ -4,7 +4,6 @@ import ( "flag" "fmt" "os" - "strings" "github.com/rs/zerolog/log" "github.com/spf13/pflag" @@ -13,12 +12,12 @@ import ( shared "github.com/iLert/ilert-kube-agent" "github.com/iLert/ilert-kube-agent/pkg/config" "github.com/iLert/ilert-kube-agent/pkg/logger" - "github.com/iLert/ilert-kube-agent/pkg/utils" ) var ( help bool version bool + runOnce bool cfgFile string ) @@ -26,10 +25,12 @@ func parseAndValidateFlags() *config.Config { flag.BoolVar(&help, "help", false, "Print this help.") flag.BoolVar(&version, "version", false, "Print version.") + flag.BoolVar(&runOnce, "run-once", false, "Run checks only once and exit.") flag.StringVar(&cfgFile, "config", "", "Config file") flag.String("settings.kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") flag.String("settings.master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") + flag.Bool("settings.insecure", false, "The Kubernetes API server should be accessed without verifying the TLS certificate. Overrides any value in kubeconfig. Only required if out-of-cluster.") flag.String("settings.namespace", "kube-system", "Namespace in which agent run.") flag.String("settings.log.level", "info", "Log level (debug, info, warn, error, fatal).") flag.Bool("settings.log.json", false, "Enable json format log") @@ -38,6 +39,9 @@ func parseAndValidateFlags() *config.Config { flag.String("settings.apiKey", "", "(REQUIRED) The iLert alert source api key") flag.String("settings.checkInterval", "15s", "The evaluation check interval e.g. resources check") + flag.Bool("alarms.cluster.enabled", true, "Enable cluster alarms") + flag.String("alarms.cluster.priority", "HIGH", "The cluster alarm incident priority") + flag.Bool("alarms.pods.enabled", true, "Enable pod alarms") flag.Bool("alarms.pods.terminate.enabled", true, "Enable pod terminate alarms") flag.String("alarms.pods.terminate.priority", "HIGH", "The pod terminate alarm incident priority") @@ -69,9 +73,6 @@ func parseAndValidateFlags() *config.Config { pflag.Parse() viper.RegisterAlias("settings.api-key", "settings.apiKey") - viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_", "-", "_")) - viper.SetEnvPrefix("ilert") - viper.AutomaticEnv() err := viper.BindPFlags(pflag.CommandLine) if err != nil { @@ -88,105 +89,16 @@ func parseAndValidateFlags() *config.Config { os.Exit(0) } - if cfgFile != "" { - log.Debug().Str("file", cfgFile).Msg("Reading config file") - viper.SetConfigFile(cfgFile) - err := viper.ReadInConfig() - if err != nil { - log.Fatal().Err(err).Msg("Unable to read config") - } - } - cfg := &config.Config{} - err = viper.Unmarshal(cfg) - if err != nil { - log.Fatal().Err(err).Msg("Unable to decode config") - } - - if cfg.Links.Pods == nil { - cfg.Links.Pods = make([]config.ConfigLinksSetting, 0) - } - if cfg.Links.Nodes == nil { - cfg.Links.Nodes = make([]config.ConfigLinksSetting, 0) + if cfgFile != "" { + cfg.SetConfigFile(cfgFile) } - - for _, e := range os.Environ() { - pair := strings.SplitN(e, "=", 2) - if strings.HasPrefix(pair[0], "ILERT_LINKS_PODS_") { - link := strings.ReplaceAll(pair[0], "ILERT_LINKS_PODS_", "") - cfg.Links.Pods = append(cfg.Links.Pods, config.ConfigLinksSetting{ - Name: strings.Title(strings.ToLower(strings.ReplaceAll(link, "_", " "))), - Href: pair[1], - }) - } - - if strings.HasPrefix(pair[0], "ILERT_LINKS_NODES_") { - cfg.Links.Nodes = append(cfg.Links.Nodes, config.ConfigLinksSetting{ - Name: strings.Title(strings.ToLower(strings.ReplaceAll(strings.ReplaceAll(pair[0], "ILERT_LINKS_NODES_", ""), "_", " "))), - Href: pair[1], - }) - } + if runOnce { + cfg.SetRunOnce(true) } - + cfg.Load() + cfg.Validate() logger.Init(cfg.Settings.Log) - ilertAPIKeyEnv := utils.GetEnv("ILERT_API_KEY", "") - if ilertAPIKeyEnv != "" { - cfg.Settings.APIKey = ilertAPIKeyEnv - } - - namespaceEnv := utils.GetEnv("NAMESPACE", "") - if namespaceEnv != "" { - cfg.Settings.Namespace = namespaceEnv - } - - logLevelEnv := utils.GetEnv("LOG_LEVEL", "") - if logLevelEnv != "" { - cfg.Settings.Log.Level = logLevelEnv - } - - if cfg.Settings.ElectionID == "" { - log.Fatal().Msg("Election ID is required.") - } - - if cfg.Settings.Namespace == "" { - log.Fatal().Msg("Namespace is required. Use --settings.namespace flag or NAMESPACE env var") - } - - if cfg.Settings.APIKey == "" { - log.Fatal().Msg("iLert api key is required. Use --settings.apiKey flag or ILERT_API_KEY env var") - } - - if cfg.Settings.Log.Level != "debug" && cfg.Settings.Log.Level != "info" && cfg.Settings.Log.Level != "warn" && cfg.Settings.Log.Level != "error" && cfg.Settings.Log.Level != "fatal" { - log.Fatal().Msg("Invalid --settings.log.level flag value or config.") - } - - checkPriorityConfig(cfg.Alarms.Pods.Terminate.Priority, "--alarms.pods.terminate.priority") - checkPriorityConfig(cfg.Alarms.Pods.Waiting.Priority, "--alarms.pods.waiting.priority") - checkPriorityConfig(cfg.Alarms.Pods.Restarts.Priority, "--alarms.pods.restarts.priority") - checkPriorityConfig(cfg.Alarms.Pods.Resources.CPU.Priority, "--alarms.pods.resources.cpu.priority") - checkPriorityConfig(cfg.Alarms.Pods.Resources.Memory.Priority, "--alarms.pods.resources.memory.priority") - checkPriorityConfig(cfg.Alarms.Nodes.Terminate.Priority, "--alarms.nodes.terminate.priority") - checkPriorityConfig(cfg.Alarms.Nodes.Resources.CPU.Priority, "--alarms.nodes.resources.cpu.priority") - checkPriorityConfig(cfg.Alarms.Nodes.Resources.Memory.Priority, "--alarms.nodes.resources.memory.priority") - - checkThresholdConfig(cfg.Alarms.Pods.Resources.CPU.Threshold, 1, 100, "--alarms.pods.resources.cpu.threshold") - checkThresholdConfig(cfg.Alarms.Pods.Resources.Memory.Threshold, 1, 100, "--alarms.pods.resources.memory.threshold") - checkThresholdConfig(cfg.Alarms.Pods.Restarts.Threshold, 1, 1000000, "--alarms.pods.restarts.threshold") - checkThresholdConfig(cfg.Alarms.Pods.Resources.CPU.Threshold, 1, 100, "--alarms.nodes.resources.cpu.threshold") - checkThresholdConfig(cfg.Alarms.Pods.Resources.Memory.Threshold, 1, 100, "--alarms.nodes.resources.memory.threshold") - return cfg } - -func checkPriorityConfig(priority string, flag string) { - if priority != "HIGH" && priority != "LOW" { - log.Fatal().Msg(fmt.Sprintf("Invalid %s flag value.", flag)) - } -} - -func checkThresholdConfig(threshold int32, min int32, max int32, flag string) { - if threshold < min || threshold > max { - log.Fatal().Msg(fmt.Sprintf("Invalid %s flag value (min=%d max=%d).", flag, min, max)) - } -} diff --git a/cmd/main.go b/cmd/main.go index defb531..e80f5cb 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -9,7 +9,6 @@ import ( "syscall" "time" - agentclientset "github.com/iLert/ilert-kube-agent/pkg/client/clientset/versioned" "github.com/iLert/ilert-kube-agent/pkg/router" "github.com/iLert/ilert-kube-agent/pkg/storage" "github.com/iLert/ilert-kube-agent/pkg/watcher" @@ -17,12 +16,9 @@ import ( "github.com/rs/zerolog/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth" - "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" - metrics "k8s.io/metrics/pkg/client/clientset/versioned" ) const ( @@ -36,6 +32,11 @@ func main() { log.Info().Interface("config", cfg).Msg("Starting agent with config") + if cfg.GetRunOnce() { + watcher.RunOnce(cfg) + return + } + srg := &storage.Storage{} srg.Init() router := router.Setup(srg) @@ -60,38 +61,12 @@ func main() { log.Fatal().Err(err).Msg("Unable to get hostname") } - config, err := clientcmd.BuildConfigFromFlags(cfg.Settings.Master, cfg.Settings.KubeConfig) - if err != nil { - log.Fatal().Err(err).Msg("Failed to build kubeconfig") - } - - kubeClient, err := kubernetes.NewForConfig(config) - if err != nil { - log.Fatal().Err(err).Msg("Failed to create kube client") - } - - agentKubeClient, err := agentclientset.NewForConfig(config) - if err != nil { - log.Fatal().Err(err).Msg("Failed to create kube client") - } - - metricsClient, err := metrics.NewForConfig(config) - if err != nil { - log.Fatal().Err(err).Msg("Failed to create metrics client") - } - - // Validate that the client is ok. - _, err = kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{}) - if err != nil { - log.Fatal().Err(err).Msg("Failed to get nodes from apiserver") - } - lock := &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Name: cfg.Settings.ElectionID, Namespace: cfg.Settings.Namespace, }, - Client: kubeClient.CoordinationV1(), + Client: cfg.KubeClient.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: id, }, @@ -117,7 +92,7 @@ func main() { Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(_ context.Context) { log.Info().Str("identity", id).Msg("I am the new leader") - watcher.Start(kubeClient, metricsClient, agentKubeClient, cfg) + watcher.Start(cfg) }, OnStoppedLeading: func() { watcher.Stop() diff --git a/config.yaml b/config.yaml index d36037f..7a2443a 100644 --- a/config.yaml +++ b/config.yaml @@ -8,6 +8,9 @@ settings: ## The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster. # master: "" + ## The Kubernetes API server should be accessed without verifying the TLS certificate. Overrides any value in kubeconfig. Only required if out-of-cluster. + # insecure: false + ## Namespace in which agent run. namespace: kube-systems @@ -27,6 +30,12 @@ settings: json: false alarms: + cluster: + ## Enables cluster alarms + enabled: true + ## The cluster alarm incident priority + priority: HIGH + pods: ## Enables all pod alarms enabled: false diff --git a/pkg/config/factory.go b/pkg/config/factory.go new file mode 100644 index 0000000..e073ffe --- /dev/null +++ b/pkg/config/factory.go @@ -0,0 +1,58 @@ +package config + +import ( + "github.com/rs/zerolog/log" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + metrics "k8s.io/metrics/pkg/client/clientset/versioned" + + agentclientset "github.com/iLert/ilert-kube-agent/pkg/client/clientset/versioned" +) + +// SetKubeConfig override default kube config +func (cfg *Config) SetKubeConfig(config *rest.Config) { + cfg.KubeConfig = config +} + +func (cfg *Config) initializeClients() { + if cfg.KubeConfig == nil { + config, err := clientcmd.BuildConfigFromFlags(cfg.Settings.Master, cfg.Settings.KubeConfig) + if err != nil { + log.Fatal().Err(err).Msg("Failed to build kubeconfig") + } else { + cfg.KubeConfig = config + } + + if cfg.Settings.Insecure { + config.Insecure = true + } + } + + if cfg.KubeClient == nil { + kubeClient, err := kubernetes.NewForConfig(cfg.KubeConfig) + if err != nil { + log.Fatal().Err(err).Msg("Failed to create kube client") + } else { + cfg.KubeClient = kubeClient + } + } + + if cfg.AgentKubeClient == nil { + agentKubeClient, err := agentclientset.NewForConfig(cfg.KubeConfig) + if err != nil { + log.Fatal().Err(err).Msg("Failed to create kube agent client") + } else { + cfg.AgentKubeClient = agentKubeClient + } + } + + if cfg.MetricsClient == nil { + metricsClient, err := metrics.NewForConfig(cfg.KubeConfig) + if err != nil { + log.Fatal().Err(err).Msg("Failed to create metrics client") + } else { + cfg.MetricsClient = metricsClient + } + } +} diff --git a/pkg/config/load.go b/pkg/config/load.go new file mode 100644 index 0000000..4174ec4 --- /dev/null +++ b/pkg/config/load.go @@ -0,0 +1,114 @@ +package config + +import ( + "encoding/base64" + "fmt" + "os" + "strings" + + "github.com/iLert/ilert-kube-agent/pkg/utils" + "github.com/rs/zerolog/log" + "github.com/spf13/viper" +) + +var runOnce bool + +func (cfg *Config) SetRunOnce(r bool) { + runOnce = r +} + +func (cfg *Config) GetRunOnce() bool { + return runOnce +} + +// SetConfigFile set config file path and read it into struct +func (cfg *Config) SetConfigFile(cfgFile string) { + if cfgFile != "" { + log.Debug().Str("file", cfgFile).Msg("Reading config file") + viper.SetConfigFile(cfgFile) + err := viper.ReadInConfig() + if err != nil { + log.Fatal().Err(err).Msg("Unable to read config") + } + } +} + +// Load reads config from file, envs or flags +func (cfg *Config) Load() { + viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_", "-", "_")) + viper.SetEnvPrefix("ilert") + viper.AutomaticEnv() + + err := viper.Unmarshal(cfg) + if err != nil { + log.Fatal().Err(err).Msg("Unable to decode config") + } + + if cfg.Links.Pods == nil { + cfg.Links.Pods = make([]ConfigLinksSetting, 0) + } + if cfg.Links.Nodes == nil { + cfg.Links.Nodes = make([]ConfigLinksSetting, 0) + } + + for _, e := range os.Environ() { + pair := strings.SplitN(e, "=", 2) + if strings.HasPrefix(pair[0], "ILERT_LINKS_PODS_") { + link := strings.ReplaceAll(pair[0], "ILERT_LINKS_PODS_", "") + cfg.Links.Pods = append(cfg.Links.Pods, ConfigLinksSetting{ + Name: strings.Title(strings.ToLower(strings.ReplaceAll(link, "_", " "))), + Href: pair[1], + }) + } + + if strings.HasPrefix(pair[0], "ILERT_LINKS_NODES_") { + cfg.Links.Nodes = append(cfg.Links.Nodes, ConfigLinksSetting{ + Name: strings.Title(strings.ToLower(strings.ReplaceAll(strings.ReplaceAll(pair[0], "ILERT_LINKS_NODES_", ""), "_", " "))), + Href: pair[1], + }) + } + } + + ilertAPIKeyEnv := utils.GetEnv("ILERT_API_KEY", "") + if ilertAPIKeyEnv != "" { + cfg.Settings.APIKey = ilertAPIKeyEnv + } + + namespaceEnv := utils.GetEnv("NAMESPACE", "") + if namespaceEnv != "" { + cfg.Settings.Namespace = namespaceEnv + } + + logLevelEnv := utils.GetEnv("LOG_LEVEL", "") + if logLevelEnv != "" { + cfg.Settings.Log.Level = logLevelEnv + } + + cfg.Validate() + + // Base64 encoded kubeconfig + encodedKubeConfig := utils.GetEnv("KUBECONFIG", "") + if encodedKubeConfig != "" { + kubeConfigBytes, err := base64.StdEncoding.DecodeString(encodedKubeConfig) + if err != nil { + log.Fatal().Err(err).Msg("Failed to decode kubeconfig from base64") + } + + kubeConfigPath := "/tmp/kubeconfig" + f, err := os.Create(kubeConfigPath) + if err != nil { + log.Fatal().Err(err).Msg(fmt.Sprintf("Failed to create %s file", kubeConfigPath)) + } + + _, err = f.Write(kubeConfigBytes) + if err != nil { + f.Close() + log.Fatal().Err(err).Msg(fmt.Sprintf("Failed to write %s file", kubeConfigPath)) + } + + f.Close() + cfg.Settings.KubeConfig = kubeConfigPath + } + + cfg.initializeClients() +} diff --git a/pkg/config/main.go b/pkg/config/main.go index 4bf2f83..c39f833 100644 --- a/pkg/config/main.go +++ b/pkg/config/main.go @@ -1,7 +1,20 @@ package config +import ( + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + metrics "k8s.io/metrics/pkg/client/clientset/versioned" + + agentclientset "github.com/iLert/ilert-kube-agent/pkg/client/clientset/versioned" +) + // Config definition type Config struct { + KubeConfig *rest.Config + KubeClient *kubernetes.Clientset + AgentKubeClient *agentclientset.Clientset + MetricsClient *metrics.Clientset + Settings ConfigSettings `yaml:"settings"` Alarms ConfigAlarms `yaml:"alarms"` Links ConfigLinks `yaml:"links"` @@ -12,6 +25,7 @@ type ConfigSettings struct { APIKey string `yaml:"apiKey"` KubeConfig string `yaml:"kubeconfig"` Master string `yaml:"master"` + Insecure bool `yaml:"insecure"` Namespace string `yaml:"namespace"` Port int `yaml:"port"` Log ConfigSettingsLog `yaml:"log"` @@ -27,8 +41,9 @@ type ConfigSettingsLog struct { // ConfigAlarms definition type ConfigAlarms struct { - Pods ConfigAlarmsPods `yaml:"pods"` - Nodes ConfigAlarmsNodes `yaml:"nodes"` + Cluster ConfigAlarmSetting `yaml:"cluster"` + Pods ConfigAlarmsPods `yaml:"pods"` + Nodes ConfigAlarmsNodes `yaml:"nodes"` } // ConfigAlarmsPods definition diff --git a/pkg/config/validate.go b/pkg/config/validate.go new file mode 100644 index 0000000..1a78ebb --- /dev/null +++ b/pkg/config/validate.go @@ -0,0 +1,53 @@ +package config + +import ( + "fmt" + + "github.com/rs/zerolog/log" +) + +// Validate analyze config values and throws an error if some problem found +func (cfg *Config) Validate() { + if cfg.Settings.ElectionID == "" { + log.Fatal().Msg("Election ID is required.") + } + + if cfg.Settings.Namespace == "" { + log.Fatal().Msg("Namespace is required. Use --settings.namespace flag or NAMESPACE env var") + } + + if cfg.Settings.APIKey == "" { + log.Fatal().Msg("iLert api key is required. Use --settings.apiKey flag or ILERT_API_KEY env var") + } + + if cfg.Settings.Log.Level != "debug" && cfg.Settings.Log.Level != "info" && cfg.Settings.Log.Level != "warn" && cfg.Settings.Log.Level != "error" && cfg.Settings.Log.Level != "fatal" { + log.Fatal().Msg("Invalid --settings.log.level flag value or config.") + } + + checkPriority(cfg.Alarms.Pods.Terminate.Priority, "--alarms.pods.terminate.priority") + checkPriority(cfg.Alarms.Pods.Waiting.Priority, "--alarms.pods.waiting.priority") + checkPriority(cfg.Alarms.Pods.Restarts.Priority, "--alarms.pods.restarts.priority") + checkPriority(cfg.Alarms.Pods.Resources.CPU.Priority, "--alarms.pods.resources.cpu.priority") + checkPriority(cfg.Alarms.Pods.Resources.Memory.Priority, "--alarms.pods.resources.memory.priority") + checkPriority(cfg.Alarms.Nodes.Terminate.Priority, "--alarms.nodes.terminate.priority") + checkPriority(cfg.Alarms.Nodes.Resources.CPU.Priority, "--alarms.nodes.resources.cpu.priority") + checkPriority(cfg.Alarms.Nodes.Resources.Memory.Priority, "--alarms.nodes.resources.memory.priority") + + checkThreshold(cfg.Alarms.Pods.Resources.CPU.Threshold, 1, 100, "--alarms.pods.resources.cpu.threshold") + checkThreshold(cfg.Alarms.Pods.Resources.Memory.Threshold, 1, 100, "--alarms.pods.resources.memory.threshold") + checkThreshold(cfg.Alarms.Pods.Restarts.Threshold, 1, 1000000, "--alarms.pods.restarts.threshold") + checkThreshold(cfg.Alarms.Pods.Resources.CPU.Threshold, 1, 100, "--alarms.nodes.resources.cpu.threshold") + checkThreshold(cfg.Alarms.Pods.Resources.Memory.Threshold, 1, 100, "--alarms.nodes.resources.memory.threshold") +} + +func checkPriority(priority string, flag string) { + if priority != "HIGH" && priority != "LOW" { + log.Fatal().Msg(fmt.Sprintf("Invalid %s flag value.", flag)) + } +} + +func checkThreshold(threshold int32, min int32, max int32, flag string) { + if threshold < min || threshold > max { + log.Fatal().Msg(fmt.Sprintf("Invalid %s flag value (min=%d max=%d).", flag, min, max)) + } +} diff --git a/pkg/incident/main.go b/pkg/incident/main.go index d0210ee..8c62bf2 100644 --- a/pkg/incident/main.go +++ b/pkg/incident/main.go @@ -32,6 +32,11 @@ func CreateEvent( ilertClient = ilert.NewClient(ilert.WithUserAgent(fmt.Sprintf("ilert-kube-agent/%s", shared.Version))) } + if cfg.Settings.APIKey == "" { + log.Error().Msg("Failed to create an incident event. API key is required") + return nil + } + event := &ilert.Event{ IncidentKey: incidentKey, Summary: summary, @@ -68,6 +73,9 @@ func CreateEvent( // CreateIncidentRef definition func CreateIncidentRef(agentKubeClient *agentclientset.Clientset, name string, namespace string, incidentID *int64, summary string, details string, incidentType string) { + if agentKubeClient == nil { + return + } if incidentID != nil && *incidentID > 0 { log.Debug().Int64("incident_id", *incidentID).Str("name", name).Str("namespace", namespace).Msg("Creating incident ref") incident := &v1.Incident{ @@ -91,6 +99,9 @@ func CreateIncidentRef(agentKubeClient *agentclientset.Clientset, name string, n // GetIncidentRef definition func GetIncidentRef(agentKubeClient *agentclientset.Clientset, name string, namespace string) *v1.Incident { + if agentKubeClient == nil { + return nil + } incident, err := agentKubeClient.IlertV1().Incidents(namespace).Get(name, metav1.GetOptions{}) if err != nil { // log.Debug().Err(err).Msg("Failed to get incident ref") @@ -104,6 +115,9 @@ func GetIncidentRef(agentKubeClient *agentclientset.Clientset, name string, name // DeleteIncidentRef definition func DeleteIncidentRef(agentKubeClient *agentclientset.Clientset, name string, namespace string) { + if agentKubeClient == nil { + return + } log.Debug().Str("name", name).Str("namespace", namespace).Msg("Deleting incident ref") err := agentKubeClient.IlertV1().Incidents(namespace).Delete(name, &metav1.DeleteOptions{}) if err != nil { diff --git a/pkg/watcher/cluster.go b/pkg/watcher/cluster.go new file mode 100644 index 0000000..8443db2 --- /dev/null +++ b/pkg/watcher/cluster.go @@ -0,0 +1,100 @@ +package watcher + +import ( + "errors" + "fmt" + + "github.com/iLert/ilert-go" + "github.com/iLert/ilert-kube-agent/pkg/config" + "github.com/iLert/ilert-kube-agent/pkg/incident" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func getClusterKey(cfg *config.Config) string { + return fmt.Sprintf("%s/%s", cfg.Settings.Namespace, cfg.Settings.ElectionID) +} + +func getConfigDetails(cfg *config.Config) string { + details := fmt.Sprintf("Master: %s\nKubeConfig: %s\nElectionID: %s\nNamespace: %s\nInsecure: %v", + cfg.Settings.Master, + cfg.Settings.KubeConfig, + cfg.Settings.ElectionID, + cfg.Settings.Namespace, + cfg.Settings.Insecure) + + return details +} + +func analyzeClusterStatus(cfg *config.Config) error { + clusterKey := getClusterKey(cfg) + + // Init check + incidentKeyInit := fmt.Sprintf("%s-init", clusterKey) + incidentRefInit := incident.GetIncidentRef(cfg.AgentKubeClient, incidentKeyInit, cfg.Settings.Namespace) + + if cfg.KubeClient == nil || cfg.AgentKubeClient == nil || cfg.MetricsClient == nil { + summary := fmt.Sprintf("Cluster connection is not established: %s", clusterKey) + if incidentRefInit == nil && cfg.Alarms.Cluster.Enabled { + details := getConfigDetails(cfg) + incidentID := incident.CreateEvent(cfg, nil, incidentKeyInit, summary, details, ilert.EventTypes.Alert, ilert.IncidentPriorities.High) + incident.CreateIncidentRef(cfg.AgentKubeClient, incidentKeyInit, cfg.Settings.Namespace, incidentID, summary, details, "cluster-init") + } + return errors.New(summary) + } + + if incidentRefInit != nil { + summary := fmt.Sprintf("Cluster connection is established: %s", clusterKey) + incident.CreateEvent(cfg, nil, incidentKeyInit, summary, "", ilert.EventTypes.Resolve, "") + incident.DeleteIncidentRef(cfg.AgentKubeClient, incidentKeyInit, cfg.Settings.Namespace) + } + + // Client check + incidentKeyClient := fmt.Sprintf("%s-client", clusterKey) + incidentRefClient := incident.GetIncidentRef(cfg.AgentKubeClient, incidentKeyClient, cfg.Settings.Namespace) + + _, err := cfg.KubeClient.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil { + if incidentRefClient == nil && cfg.Alarms.Cluster.Enabled { + summary := fmt.Sprintf("Failed to get nodes from apiserver %s", clusterKey) + details := getConfigDetails(cfg) + details += fmt.Sprintf("\n\nError: \n%v", err.Error()) + incidentID := incident.CreateEvent(cfg, nil, incidentKeyClient, summary, details, ilert.EventTypes.Alert, ilert.IncidentPriorities.High) + incident.CreateIncidentRef(cfg.AgentKubeClient, incidentKeyClient, cfg.Settings.Namespace, incidentID, summary, details, "cluster-client") + } + return err + } + + if incidentRefClient != nil { + summary := fmt.Sprintf("Cluster client is ok: %s", clusterKey) + incident.CreateEvent(cfg, nil, incidentKeyClient, summary, "", ilert.EventTypes.Resolve, "") + incident.DeleteIncidentRef(cfg.AgentKubeClient, incidentKeyClient, cfg.Settings.Namespace) + } + + // CLuster health check + incidentKeyHealth := fmt.Sprintf("%s-health", clusterKey) + incidentRefHealth := incident.GetIncidentRef(cfg.AgentKubeClient, incidentKeyClient, cfg.Settings.Namespace) + path := "/healthz" + content, err := cfg.KubeClient.Discovery().RESTClient().Get().AbsPath(path).DoRaw() + if err != nil { + return err + } + + contentStr := string(content) + if contentStr != "ok" { + summary := fmt.Sprintf("Cluster is not healthy: %s", clusterKey) + if incidentRefHealth == nil && cfg.Alarms.Cluster.Enabled { + details := getConfigDetails(cfg) + incidentID := incident.CreateEvent(cfg, nil, incidentKeyHealth, summary, details, ilert.EventTypes.Alert, ilert.IncidentPriorities.High) + incident.CreateIncidentRef(cfg.AgentKubeClient, incidentKeyHealth, cfg.Settings.Namespace, incidentID, summary, details, "cluster-health") + } + return errors.New(summary) + } + + if incidentRefHealth != nil { + summary := fmt.Sprintf("Cluster is healthy: %s", clusterKey) + incident.CreateEvent(cfg, nil, incidentKeyHealth, summary, "", ilert.EventTypes.Resolve, "") + incident.DeleteIncidentRef(cfg.AgentKubeClient, incidentKeyHealth, cfg.Settings.Namespace) + } + + return nil +} diff --git a/pkg/watcher/main.go b/pkg/watcher/main.go index c3bc64a..413276a 100644 --- a/pkg/watcher/main.go +++ b/pkg/watcher/main.go @@ -2,11 +2,10 @@ package watcher import ( "github.com/rs/zerolog/log" - "k8s.io/client-go/kubernetes" - metrics "k8s.io/metrics/pkg/client/clientset/versioned" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - agentclientset "github.com/iLert/ilert-kube-agent/pkg/client/clientset/versioned" "github.com/iLert/ilert-kube-agent/pkg/config" + "github.com/iLert/ilert-kube-agent/pkg/logger" ) // These are the valid reason for the container waiting @@ -33,16 +32,16 @@ const ( var containerTerminatedReasons = []string{Terminated, OOMKilled, Error, ContainerCannotRun, DeadlineExceeded} // Start starts watcher -func Start(kubeClient *kubernetes.Clientset, metricsClient *metrics.Clientset, agentKubeClient *agentclientset.Clientset, cfg *config.Config) { +func Start(cfg *config.Config) { log.Info().Msg("Start watcher") if cfg.Alarms.Pods.Enabled { - go startPodInformer(kubeClient, agentKubeClient, cfg) - go startPodChecker(kubeClient, metricsClient, agentKubeClient, cfg) + go startPodInformer(cfg) + go startPodChecker(cfg) } if cfg.Alarms.Nodes.Enabled { - go startNodeInformer(kubeClient, agentKubeClient, cfg) - go startNodeChecker(kubeClient, metricsClient, agentKubeClient, cfg) + go startNodeInformer(cfg) + go startNodeChecker(cfg) } } @@ -55,3 +54,43 @@ func Stop() { stopNodeInformer() stopNodeMetricsChecker() } + +// RunOnce run watcher runs e.g. serverless call +func RunOnce(cfg *config.Config) { + log.Info().Msg("Run watcher once") + + cfg.Validate() + logger.Init(cfg.Settings.Log) + + err := analyzeClusterStatus(cfg) + if err != nil { + log.Fatal().Err(err).Msg("Failed to check cluster status") + return + } + + if cfg.Alarms.Pods.Enabled { + pods, err := cfg.KubeClient.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{}) + if err != nil { + log.Fatal().Err(err).Msg("Failed to get nodes from apiserver") + } + + for _, pod := range pods.Items { + analyzePodStatus(&pod, cfg) + analyzePodResources(&pod, cfg) + } + } + if cfg.Alarms.Nodes.Enabled { + nodes, err := cfg.KubeClient.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil { + log.Fatal().Err(err).Msg("Failed to get nodes from apiserver") + } + + log.Debug().Msg("Running nodes resource check") + for _, node := range nodes.Items { + analyzeNodeStatus(&node, cfg) + analyzeNodeResources(&node, cfg) + } + } + + log.Info().Msg("Watcher finished") +} diff --git a/pkg/watcher/node.go b/pkg/watcher/node.go index cae8828..521ff92 100644 --- a/pkg/watcher/node.go +++ b/pkg/watcher/node.go @@ -2,11 +2,16 @@ package watcher import ( "fmt" + "strconv" "github.com/cbroglie/mustache" + "github.com/dustin/go-humanize" "github.com/iLert/ilert-go" "github.com/iLert/ilert-kube-agent/pkg/config" + "github.com/iLert/ilert-kube-agent/pkg/incident" + "github.com/rs/zerolog/log" api "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" ) @@ -61,3 +66,95 @@ func getNodeLinks(cfg *config.Config, node *api.Node) []ilert.IncidentLink { } return links } + +func analyzeNodeStatus(node *api.Node, cfg *config.Config) { + nodeKey := getNodeKey(node) + incidentRef := incident.GetIncidentRef(cfg.AgentKubeClient, nodeKey, cfg.Settings.Namespace) + + if node.Status.Phase == api.NodeTerminated && cfg.Alarms.Nodes.Terminate.Enabled && incidentRef == nil { + summary := fmt.Sprintf("Node %s terminated", node.GetName()) + details := getNodeDetails(cfg.KubeClient, node) + links := getNodeLinks(cfg, node) + incidentID := incident.CreateEvent(cfg, links, nodeKey, summary, details, ilert.EventTypes.Alert, cfg.Alarms.Nodes.Terminate.Priority) + incident.CreateIncidentRef(cfg.AgentKubeClient, node.GetName(), cfg.Settings.Namespace, incidentID, summary, details, "terminate") + } +} + +func analyzeNodeResources(node *api.Node, cfg *config.Config) error { + if !cfg.Alarms.Nodes.Resources.Enabled { + return nil + } + nodeKey := getNodeKey(node) + incidentRef := incident.GetIncidentRef(cfg.AgentKubeClient, node.GetName(), cfg.Settings.Namespace) + + nodeMetrics, err := cfg.MetricsClient.MetricsV1beta1().NodeMetricses().Get(node.GetName(), metav1.GetOptions{}) + if err != nil { + log.Debug().Err(err).Msg("Failed to get node metrics") + return err + } + + healthy := true + var memoryUsage int64 + var cpuUsage, cpuLimit float64 + cpuUsageDec := nodeMetrics.Usage.Cpu().AsDec().String() + cpuUsage, err = strconv.ParseFloat(cpuUsageDec, 64) + if err != nil { + cpuUsage = 0 + } + memoryUsage, ok := nodeMetrics.Usage.Memory().AsInt64() + if !ok { + memoryUsage = 0 + } + + if cfg.Alarms.Nodes.Resources.CPU.Enabled { + cpuLimitDec := node.Status.Capacity.Cpu().AsDec().String() + cpuLimit, err = strconv.ParseFloat(cpuLimitDec, 64) + if err != nil { + cpuLimit = 0 + } + if ok && cpuLimit > 0 && cpuUsage > 0 { + log.Debug(). + Str("node", node.GetName()). + Float64("limit", cpuLimit). + Float64("usage", cpuUsage). + Msg("Checking CPU limit") + if cpuUsage >= (float64(cfg.Alarms.Nodes.Resources.CPU.Threshold) * (cpuLimit / 100)) { + healthy = false + if incidentRef == nil { + summary := fmt.Sprintf("Node %s CPU limit reached > %d%%", node.GetName(), cfg.Alarms.Nodes.Resources.CPU.Threshold) + details := getNodeDetailsWithUsageLimit(cfg.KubeClient, node, fmt.Sprintf("%.3f CPU", cpuUsage), fmt.Sprintf("%.3f CPU", cpuLimit)) + links := getNodeLinks(cfg, node) + incidentID := incident.CreateEvent(cfg, links, nodeKey, summary, details, ilert.EventTypes.Alert, cfg.Alarms.Nodes.Resources.CPU.Priority) + incident.CreateIncidentRef(cfg.AgentKubeClient, node.GetName(), cfg.Settings.Namespace, incidentID, summary, details, "resources") + } + } + } + } + + if cfg.Alarms.Nodes.Resources.Memory.Enabled { + memoryLimit, ok := node.Status.Capacity.Memory().AsInt64() + if ok && memoryLimit > 0 && memoryUsage > 0 { + log.Debug(). + Str("node", node.GetName()). + Int64("limit", memoryLimit). + Int64("usage", memoryUsage). + Msg("Checking memory limit") + if memoryUsage >= (int64(cfg.Alarms.Nodes.Resources.Memory.Threshold) * (memoryLimit / 100)) { + healthy = false + if incidentRef == nil { + summary := fmt.Sprintf("Node %s memory limit reached > %d%%", node.GetName(), cfg.Alarms.Nodes.Resources.Memory.Threshold) + details := getNodeDetailsWithUsageLimit(cfg.KubeClient, node, humanize.Bytes(uint64(memoryUsage)), humanize.Bytes(uint64(memoryLimit))) + links := getNodeLinks(cfg, node) + incidentID := incident.CreateEvent(cfg, links, nodeKey, summary, details, ilert.EventTypes.Alert, cfg.Alarms.Nodes.Resources.Memory.Priority) + incident.CreateIncidentRef(cfg.AgentKubeClient, node.GetName(), cfg.Settings.Namespace, incidentID, summary, details, "resources") + } + } + } + } + + if healthy && incidentRef != nil && incidentRef.Spec.ID > 0 && incidentRef.Spec.Type == "resources" { + incident.CreateEvent(cfg, nil, nodeKey, fmt.Sprintf("Node %s recovered", node.GetName()), "", ilert.EventTypes.Resolve, "") + incident.DeleteIncidentRef(cfg.AgentKubeClient, node.GetName(), cfg.Settings.Namespace) + } + return nil +} diff --git a/pkg/watcher/node_checker.go b/pkg/watcher/node_checker.go index 5c2f294..9e4e9e8 100644 --- a/pkg/watcher/node_checker.go +++ b/pkg/watcher/node_checker.go @@ -2,27 +2,20 @@ package watcher import ( "fmt" - "strconv" - "github.com/dustin/go-humanize" "github.com/robfig/cron/v3" "github.com/rs/zerolog/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - metrics "k8s.io/metrics/pkg/client/clientset/versioned" - "github.com/iLert/ilert-go" - agentclientset "github.com/iLert/ilert-kube-agent/pkg/client/clientset/versioned" "github.com/iLert/ilert-kube-agent/pkg/config" - "github.com/iLert/ilert-kube-agent/pkg/incident" ) var nodeCheckerCron *cron.Cron -func startNodeChecker(kubeClient *kubernetes.Clientset, metricsClient *metrics.Clientset, agentKubeClient *agentclientset.Clientset, cfg *config.Config) { +func startNodeChecker(cfg *config.Config) { nodeCheckerCron = cron.New() nodeCheckerCron.AddFunc(fmt.Sprintf("@every %s", cfg.Settings.CheckInterval), func() { - checkNodes(kubeClient, metricsClient, agentKubeClient, cfg) + checkNodes(cfg) }) log.Info().Msg("Starting nodes checker") @@ -36,87 +29,14 @@ func stopNodeMetricsChecker() { } } -func checkNodes(kubeClient *kubernetes.Clientset, metricsClient *metrics.Clientset, agentKubeClient *agentclientset.Clientset, cfg *config.Config) { - nodes, err := kubeClient.CoreV1().Nodes().List(metav1.ListOptions{}) +func checkNodes(cfg *config.Config) { + nodes, err := cfg.KubeClient.CoreV1().Nodes().List(metav1.ListOptions{}) if err != nil { log.Fatal().Err(err).Msg("Failed to get nodes from apiserver") } - if cfg.Alarms.Nodes.Resources.Enabled { - log.Debug().Msg("Running nodes resource check") - for _, node := range nodes.Items { - nodeKey := getNodeKey(&node) - incidentRef := incident.GetIncidentRef(agentKubeClient, node.GetName(), cfg.Settings.Namespace) - - nodeMetrics, err := metricsClient.MetricsV1beta1().NodeMetricses().Get(node.GetName(), metav1.GetOptions{}) - if err != nil { - log.Debug().Err(err).Msg("Failed to get node metrics") - continue - } - - healthy := true - var memoryUsage int64 - var cpuUsage, cpuLimit float64 - cpuUsageDec := nodeMetrics.Usage.Cpu().AsDec().String() - cpuUsage, err = strconv.ParseFloat(cpuUsageDec, 64) - if err != nil { - cpuUsage = 0 - } - memoryUsage, ok := nodeMetrics.Usage.Memory().AsInt64() - if !ok { - memoryUsage = 0 - } - - if cfg.Alarms.Nodes.Resources.CPU.Enabled { - cpuLimitDec := node.Status.Capacity.Cpu().AsDec().String() - cpuLimit, err = strconv.ParseFloat(cpuLimitDec, 64) - if err != nil { - cpuLimit = 0 - } - if ok && cpuLimit > 0 && cpuUsage > 0 { - log.Debug(). - Str("node", node.GetName()). - Float64("limit", cpuLimit). - Float64("usage", cpuUsage). - Msg("Checking CPU limit") - if cpuUsage >= (float64(cfg.Alarms.Nodes.Resources.CPU.Threshold) * (cpuLimit / 100)) { - healthy = false - if incidentRef == nil { - summary := fmt.Sprintf("Node %s CPU limit reached > %d%%", node.GetName(), cfg.Alarms.Nodes.Resources.CPU.Threshold) - details := getNodeDetailsWithUsageLimit(kubeClient, &node, fmt.Sprintf("%.3f CPU", cpuUsage), fmt.Sprintf("%.3f CPU", cpuLimit)) - links := getNodeLinks(cfg, &node) - incidentID := incident.CreateEvent(cfg, links, nodeKey, summary, details, ilert.EventTypes.Alert, cfg.Alarms.Nodes.Resources.CPU.Priority) - incident.CreateIncidentRef(agentKubeClient, node.GetName(), cfg.Settings.Namespace, incidentID, summary, details, "resources") - } - } - } - } - - if cfg.Alarms.Nodes.Resources.Memory.Enabled { - memoryLimit, ok := node.Status.Capacity.Memory().AsInt64() - if ok && memoryLimit > 0 && memoryUsage > 0 { - log.Debug(). - Str("node", node.GetName()). - Int64("limit", memoryLimit). - Int64("usage", memoryUsage). - Msg("Checking memory limit") - if memoryUsage >= (int64(cfg.Alarms.Nodes.Resources.Memory.Threshold) * (memoryLimit / 100)) { - healthy = false - if incidentRef == nil { - summary := fmt.Sprintf("Node %s memory limit reached > %d%%", node.GetName(), cfg.Alarms.Nodes.Resources.Memory.Threshold) - details := getNodeDetailsWithUsageLimit(kubeClient, &node, humanize.Bytes(uint64(memoryUsage)), humanize.Bytes(uint64(memoryLimit))) - links := getNodeLinks(cfg, &node) - incidentID := incident.CreateEvent(cfg, links, nodeKey, summary, details, ilert.EventTypes.Alert, cfg.Alarms.Nodes.Resources.Memory.Priority) - incident.CreateIncidentRef(agentKubeClient, node.GetName(), cfg.Settings.Namespace, incidentID, summary, details, "resources") - } - } - } - } - - if healthy && incidentRef != nil && incidentRef.Spec.ID > 0 && incidentRef.Spec.Type == "resources" { - incident.CreateEvent(cfg, nil, nodeKey, fmt.Sprintf("Node %s recovered", node.GetName()), "", ilert.EventTypes.Resolve, "") - incident.DeleteIncidentRef(agentKubeClient, node.GetName(), cfg.Settings.Namespace) - } - } + log.Debug().Msg("Running nodes resource check") + for _, node := range nodes.Items { + analyzeNodeResources(&node, cfg) } } diff --git a/pkg/watcher/node_informer.go b/pkg/watcher/node_informer.go index 4144e52..3b01ce1 100644 --- a/pkg/watcher/node_informer.go +++ b/pkg/watcher/node_informer.go @@ -1,41 +1,25 @@ package watcher import ( - "fmt" - "github.com/rs/zerolog/log" api "k8s.io/api/core/v1" "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" - "github.com/iLert/ilert-go" - agentclientset "github.com/iLert/ilert-kube-agent/pkg/client/clientset/versioned" "github.com/iLert/ilert-kube-agent/pkg/config" - "github.com/iLert/ilert-kube-agent/pkg/incident" ) var nodeInformerStopper chan struct{} -func startNodeInformer(kubeClient *kubernetes.Clientset, agentKubeClient *agentclientset.Clientset, cfg *config.Config) { - factory := informers.NewSharedInformerFactory(kubeClient, 0) +func startNodeInformer(cfg *config.Config) { + factory := informers.NewSharedInformerFactory(cfg.KubeClient, 0) nodeInformer := factory.Core().V1().Nodes().Informer() nodeInformerStopper = make(chan struct{}) nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: func(oldObj interface{}, newObj interface{}) { node := newObj.(*api.Node) - nodeKey := getNodeKey(node) - - incidentRef := incident.GetIncidentRef(agentKubeClient, nodeKey, cfg.Settings.Namespace) log.Debug().Interface("node_name", node.GetName()).Msg("Update Node") - - if node.Status.Phase == api.NodeTerminated && incidentRef == nil { - summary := fmt.Sprintf("Node %s terminated", node.GetName()) - details := getNodeDetails(kubeClient, node) - links := getNodeLinks(cfg, node) - incidentID := incident.CreateEvent(cfg, links, nodeKey, summary, details, ilert.EventTypes.Alert, cfg.Alarms.Nodes.Terminate.Priority) - incident.CreateIncidentRef(agentKubeClient, node.GetName(), cfg.Settings.Namespace, incidentID, summary, details, "terminate") - } + analyzeNodeStatus(node, cfg) }, }) diff --git a/pkg/watcher/pod.go b/pkg/watcher/pod.go index b614047..b4769f0 100644 --- a/pkg/watcher/pod.go +++ b/pkg/watcher/pod.go @@ -2,14 +2,20 @@ package watcher import ( "bytes" + "errors" "fmt" "io" + "strconv" "github.com/cbroglie/mustache" + "github.com/dustin/go-humanize" "github.com/iLert/ilert-go" "github.com/iLert/ilert-kube-agent/pkg/config" + "github.com/iLert/ilert-kube-agent/pkg/incident" "github.com/iLert/ilert-kube-agent/pkg/utils" + "github.com/rs/zerolog/log" api "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" ) @@ -105,3 +111,137 @@ func getPodLinks(cfg *config.Config, node *api.Pod) []ilert.IncidentLink { } return links } + +func analyzePodStatus(pod *api.Pod, cfg *config.Config) { + podKey := getPodKey(pod) + incidentRef := incident.GetIncidentRef(cfg.AgentKubeClient, pod.GetName(), pod.GetNamespace()) + + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.State.Terminated != nil && + utils.StringContains(containerTerminatedReasons, containerStatus.State.Terminated.Reason) && + cfg.Alarms.Pods.Terminate.Enabled && incidentRef == nil { + summary := fmt.Sprintf("Pod %s/%s terminated - %s", pod.GetNamespace(), pod.GetName(), containerStatus.State.Terminated.Reason) + details := getPodDetailsWithStatus(cfg.KubeClient, pod, &containerStatus) + links := getPodLinks(cfg, pod) + incidentID := incident.CreateEvent(cfg, links, podKey, summary, details, ilert.EventTypes.Alert, cfg.Alarms.Pods.Terminate.Priority) + incident.CreateIncidentRef(cfg.AgentKubeClient, pod.GetName(), pod.GetNamespace(), incidentID, summary, details, "terminate") + break + } + + if containerStatus.State.Waiting != nil && + utils.StringContains(containerWaitingReasons, containerStatus.State.Waiting.Reason) && + cfg.Alarms.Pods.Waiting.Enabled && incidentRef == nil { + summary := fmt.Sprintf("Pod %s/%s waiting - %s", pod.GetNamespace(), pod.GetName(), containerStatus.State.Waiting.Reason) + details := getPodDetailsWithStatus(cfg.KubeClient, pod, &containerStatus) + links := getPodLinks(cfg, pod) + incidentID := incident.CreateEvent(cfg, links, podKey, summary, details, ilert.EventTypes.Alert, cfg.Alarms.Pods.Waiting.Priority) + incident.CreateIncidentRef(cfg.AgentKubeClient, pod.GetName(), pod.GetNamespace(), incidentID, summary, details, "waiting") + break + } + + if cfg.Alarms.Pods.Restarts.Enabled && containerStatus.RestartCount >= cfg.Alarms.Pods.Restarts.Threshold && incidentRef == nil { + summary := fmt.Sprintf("Pod %s/%s restarts threshold reached: %d", pod.GetNamespace(), pod.GetName(), containerStatus.RestartCount) + details := getPodDetailsWithStatus(cfg.KubeClient, pod, &containerStatus) + links := getPodLinks(cfg, pod) + incidentID := incident.CreateEvent(cfg, links, podKey, summary, details, ilert.EventTypes.Alert, cfg.Alarms.Pods.Restarts.Priority) + incident.CreateIncidentRef(cfg.AgentKubeClient, pod.GetName(), pod.GetNamespace(), incidentID, summary, details, "restarts") + break + } + } +} + +func analyzePodResources(pod *api.Pod, cfg *config.Config) error { + if !cfg.Alarms.Pods.Resources.Enabled { + return nil + } + + podKey := getPodKey(pod) + incidentRef := incident.GetIncidentRef(cfg.AgentKubeClient, pod.GetName(), pod.GetNamespace()) + + podMetrics, err := cfg.MetricsClient.MetricsV1beta1().PodMetricses(pod.GetNamespace()).Get(pod.GetName(), metav1.GetOptions{}) + if err != nil { + log.Debug().Err(err).Msg("Failed to get pod metrics") + return err + } + + healthy := true + podContainers := pod.Spec.Containers + for _, container := range podContainers { + metricsContainer := getContainerByName(container.Name, podMetrics.Containers) + if metricsContainer == nil { + log.Warn(). + Str("pod", pod.GetName()). + Str("namespace", pod.GetNamespace()). + Str("container", container.Name). + Msg("Could not find container for metrics data") + return errors.New("Could not find container for metrics data") + } + var memoryUsage int64 + var cpuUsage, cpuLimit float64 + cpuUsageDec := metricsContainer.Usage.Cpu().AsDec().String() + cpuUsage, err = strconv.ParseFloat(cpuUsageDec, 64) + if err != nil { + cpuUsage = 0 + } + memoryUsage, ok := metricsContainer.Usage.Memory().AsInt64() + if !ok { + memoryUsage = 0 + } + + if cfg.Alarms.Pods.Resources.CPU.Enabled && cpuUsage > 0 && container.Resources.Limits.Cpu() != nil { + cpuLimitDec := container.Resources.Limits.Cpu().AsDec().String() + cpuLimit, err = strconv.ParseFloat(cpuLimitDec, 64) + if err != nil { + cpuLimit = 0 + } + if cpuLimit > 0 { + log.Debug(). + Str("pod", pod.GetName()). + Str("namespace", pod.GetNamespace()). + Str("container", container.Name). + Float64("limit", cpuLimit). + Float64("usage", cpuUsage). + Msg("Checking CPU limit") + if cpuUsage >= (float64(cfg.Alarms.Pods.Resources.CPU.Threshold) * (cpuLimit / 100)) { + healthy = false + if incidentRef == nil { + summary := fmt.Sprintf("Pod %s/%s CPU limit reached > %d%%", pod.GetNamespace(), pod.GetName(), cfg.Alarms.Pods.Resources.CPU.Threshold) + details := getPodDetailsWithUsageLimit(cfg.KubeClient, pod, fmt.Sprintf("%.3f CPU", cpuUsage), fmt.Sprintf("%.3f CPU", cpuLimit)) + links := getPodLinks(cfg, pod) + incidentID := incident.CreateEvent(cfg, links, podKey, summary, details, ilert.EventTypes.Alert, cfg.Alarms.Pods.Resources.CPU.Priority) + incident.CreateIncidentRef(cfg.AgentKubeClient, pod.GetName(), pod.GetNamespace(), incidentID, summary, details, "resources") + } + } + } + } + + if cfg.Alarms.Pods.Resources.Memory.Enabled && memoryUsage > 0 && container.Resources.Limits.Memory() != nil { + memoryLimit, ok := container.Resources.Limits.Memory().AsInt64() + if ok && memoryLimit > 0 { + log.Debug(). + Str("pod", pod.GetName()). + Str("namespace", pod.GetNamespace()). + Str("container", container.Name). + Int64("limit", memoryLimit). + Int64("usage", memoryUsage). + Msg("Checking memory limit") + if memoryUsage >= (int64(cfg.Alarms.Pods.Resources.Memory.Threshold) * (memoryLimit / 100)) { + healthy = false + if incidentRef == nil { + summary := fmt.Sprintf("Pod %s/%s memory limit reached > %d%%", pod.GetNamespace(), pod.GetName(), cfg.Alarms.Pods.Resources.Memory.Threshold) + details := getPodDetailsWithUsageLimit(cfg.KubeClient, pod, humanize.Bytes(uint64(memoryUsage)), humanize.Bytes(uint64(memoryLimit))) + links := getPodLinks(cfg, pod) + incidentID := incident.CreateEvent(cfg, links, podKey, summary, details, ilert.EventTypes.Alert, cfg.Alarms.Pods.Resources.Memory.Priority) + incident.CreateIncidentRef(cfg.AgentKubeClient, pod.GetName(), pod.GetNamespace(), incidentID, summary, details, "resources") + } + } + } + } + } + if healthy && incidentRef != nil && incidentRef.Spec.ID > 0 && incidentRef.Spec.Type == "resources" { + incident.CreateEvent(cfg, nil, podKey, fmt.Sprintf("Pod %s/%s recovered", pod.GetNamespace(), pod.GetName()), "", ilert.EventTypes.Resolve, "") + incident.DeleteIncidentRef(cfg.AgentKubeClient, pod.GetName(), pod.GetNamespace()) + } + + return nil +} diff --git a/pkg/watcher/pod_checker.go b/pkg/watcher/pod_checker.go index 60f9cea..30574cf 100644 --- a/pkg/watcher/pod_checker.go +++ b/pkg/watcher/pod_checker.go @@ -2,28 +2,21 @@ package watcher import ( "fmt" - "strconv" - "github.com/dustin/go-humanize" "github.com/robfig/cron/v3" "github.com/rs/zerolog/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" "k8s.io/metrics/pkg/apis/metrics/v1beta1" - metrics "k8s.io/metrics/pkg/client/clientset/versioned" - "github.com/iLert/ilert-go" - agentclientset "github.com/iLert/ilert-kube-agent/pkg/client/clientset/versioned" "github.com/iLert/ilert-kube-agent/pkg/config" - "github.com/iLert/ilert-kube-agent/pkg/incident" ) var podCheckerCron *cron.Cron -func startPodChecker(kubeClient *kubernetes.Clientset, metricsClient *metrics.Clientset, agentKubeClient *agentclientset.Clientset, cfg *config.Config) { +func startPodChecker(cfg *config.Config) { podCheckerCron = cron.New() podCheckerCron.AddFunc(fmt.Sprintf("@every %s", cfg.Settings.CheckInterval), func() { - checkPods(kubeClient, metricsClient, agentKubeClient, cfg) + checkPods(cfg) }) log.Info().Msg("Starting pods checker") @@ -37,8 +30,8 @@ func stopPodMetricsChecker() { } } -func checkPods(kubeClient *kubernetes.Clientset, metricsClient *metrics.Clientset, agentKubeClient *agentclientset.Clientset, cfg *config.Config) { - pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{}) +func checkPods(cfg *config.Config) { + pods, err := cfg.KubeClient.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{}) if err != nil { log.Fatal().Err(err).Msg("Failed to get nodes from apiserver") } @@ -46,93 +39,7 @@ func checkPods(kubeClient *kubernetes.Clientset, metricsClient *metrics.Clientse if cfg.Alarms.Pods.Resources.Enabled { log.Debug().Msg("Running pods resource check") for _, pod := range pods.Items { - podKey := getPodKey(&pod) - incidentRef := incident.GetIncidentRef(agentKubeClient, pod.GetName(), pod.GetNamespace()) - - podMetrics, err := metricsClient.MetricsV1beta1().PodMetricses(pod.GetNamespace()).Get(pod.GetName(), metav1.GetOptions{}) - if err != nil { - log.Debug().Err(err).Msg("Failed to get pod metrics") - continue - } - - healthy := true - podContainers := pod.Spec.Containers - for _, container := range podContainers { - metricsContainer := getContainerByName(container.Name, podMetrics.Containers) - if metricsContainer == nil { - log.Warn(). - Str("pod", pod.GetName()). - Str("namespace", pod.GetNamespace()). - Str("container", container.Name). - Msg("Could not find container for metrics data") - continue - } - var memoryUsage int64 - var cpuUsage, cpuLimit float64 - cpuUsageDec := metricsContainer.Usage.Cpu().AsDec().String() - cpuUsage, err = strconv.ParseFloat(cpuUsageDec, 64) - if err != nil { - cpuUsage = 0 - } - memoryUsage, ok := metricsContainer.Usage.Memory().AsInt64() - if !ok { - memoryUsage = 0 - } - - if cfg.Alarms.Pods.Resources.CPU.Enabled && cpuUsage > 0 && container.Resources.Limits.Cpu() != nil { - cpuLimitDec := container.Resources.Limits.Cpu().AsDec().String() - cpuLimit, err = strconv.ParseFloat(cpuLimitDec, 64) - if err != nil { - cpuLimit = 0 - } - if cpuLimit > 0 { - log.Debug(). - Str("pod", pod.GetName()). - Str("namespace", pod.GetNamespace()). - Str("container", container.Name). - Float64("limit", cpuLimit). - Float64("usage", cpuUsage). - Msg("Checking CPU limit") - if cpuUsage >= (float64(cfg.Alarms.Pods.Resources.CPU.Threshold) * (cpuLimit / 100)) { - healthy = false - if incidentRef == nil { - summary := fmt.Sprintf("Pod %s/%s CPU limit reached > %d%%", pod.GetNamespace(), pod.GetName(), cfg.Alarms.Pods.Resources.CPU.Threshold) - details := getPodDetailsWithUsageLimit(kubeClient, &pod, fmt.Sprintf("%.3f CPU", cpuUsage), fmt.Sprintf("%.3f CPU", cpuLimit)) - links := getPodLinks(cfg, &pod) - incidentID := incident.CreateEvent(cfg, links, podKey, summary, details, ilert.EventTypes.Alert, cfg.Alarms.Pods.Resources.CPU.Priority) - incident.CreateIncidentRef(agentKubeClient, pod.GetName(), pod.GetNamespace(), incidentID, summary, details, "resources") - } - } - } - } - - if cfg.Alarms.Pods.Resources.Memory.Enabled && memoryUsage > 0 && container.Resources.Limits.Memory() != nil { - memoryLimit, ok := container.Resources.Limits.Memory().AsInt64() - if ok && memoryLimit > 0 { - log.Debug(). - Str("pod", pod.GetName()). - Str("namespace", pod.GetNamespace()). - Str("container", container.Name). - Int64("limit", memoryLimit). - Int64("usage", memoryUsage). - Msg("Checking memory limit") - if memoryUsage >= (int64(cfg.Alarms.Pods.Resources.Memory.Threshold) * (memoryLimit / 100)) { - healthy = false - if incidentRef == nil { - summary := fmt.Sprintf("Pod %s/%s memory limit reached > %d%%", pod.GetNamespace(), pod.GetName(), cfg.Alarms.Pods.Resources.Memory.Threshold) - details := getPodDetailsWithUsageLimit(kubeClient, &pod, humanize.Bytes(uint64(memoryUsage)), humanize.Bytes(uint64(memoryLimit))) - links := getPodLinks(cfg, &pod) - incidentID := incident.CreateEvent(cfg, links, podKey, summary, details, ilert.EventTypes.Alert, cfg.Alarms.Pods.Resources.Memory.Priority) - incident.CreateIncidentRef(agentKubeClient, pod.GetName(), pod.GetNamespace(), incidentID, summary, details, "resources") - } - } - } - } - } - if healthy && incidentRef != nil && incidentRef.Spec.ID > 0 && incidentRef.Spec.Type == "resources" { - incident.CreateEvent(cfg, nil, podKey, fmt.Sprintf("Pod %s/%s recovered", pod.GetNamespace(), pod.GetName()), "", ilert.EventTypes.Resolve, "") - incident.DeleteIncidentRef(agentKubeClient, pod.GetName(), pod.GetNamespace()) - } + analyzePodResources(&pod, cfg) } } } diff --git a/pkg/watcher/pod_informer.go b/pkg/watcher/pod_informer.go index b11a2a5..bd6b9f0 100644 --- a/pkg/watcher/pod_informer.go +++ b/pkg/watcher/pod_informer.go @@ -1,71 +1,31 @@ package watcher import ( - "fmt" - - "github.com/iLert/ilert-go" "github.com/rs/zerolog/log" api "k8s.io/api/core/v1" "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" - agentclientset "github.com/iLert/ilert-kube-agent/pkg/client/clientset/versioned" "github.com/iLert/ilert-kube-agent/pkg/config" "github.com/iLert/ilert-kube-agent/pkg/incident" - "github.com/iLert/ilert-kube-agent/pkg/utils" ) var podInformerStopper chan struct{} -func startPodInformer(kubeClient *kubernetes.Clientset, agentKubeClient *agentclientset.Clientset, cfg *config.Config) { - factory := informers.NewSharedInformerFactory(kubeClient, 0) +func startPodInformer(cfg *config.Config) { + factory := informers.NewSharedInformerFactory(cfg.KubeClient, 0) podInformer := factory.Core().V1().Pods().Informer() podInformerStopper = make(chan struct{}) podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: func(oldObj interface{}, newObj interface{}) { pod := newObj.(*api.Pod) log.Debug().Interface("pod", pod.GetName()).Msg("Update Pod") - podKey := getPodKey(pod) - incidentRef := incident.GetIncidentRef(agentKubeClient, pod.GetName(), pod.GetNamespace()) - - for _, containerStatus := range pod.Status.ContainerStatuses { - if containerStatus.State.Terminated != nil && - utils.StringContains(containerTerminatedReasons, containerStatus.State.Terminated.Reason) && - cfg.Alarms.Pods.Terminate.Enabled && incidentRef == nil { - summary := fmt.Sprintf("Pod %s/%s terminated - %s", pod.GetNamespace(), pod.GetName(), containerStatus.State.Terminated.Reason) - details := getPodDetailsWithStatus(kubeClient, pod, &containerStatus) - links := getPodLinks(cfg, pod) - incidentID := incident.CreateEvent(cfg, links, podKey, summary, details, ilert.EventTypes.Alert, cfg.Alarms.Pods.Terminate.Priority) - incident.CreateIncidentRef(agentKubeClient, pod.GetName(), pod.GetNamespace(), incidentID, summary, details, "terminate") - break - } - - if containerStatus.State.Waiting != nil && - utils.StringContains(containerWaitingReasons, containerStatus.State.Waiting.Reason) && - cfg.Alarms.Pods.Waiting.Enabled && incidentRef == nil { - summary := fmt.Sprintf("Pod %s/%s waiting - %s", pod.GetNamespace(), pod.GetName(), containerStatus.State.Waiting.Reason) - details := getPodDetailsWithStatus(kubeClient, pod, &containerStatus) - links := getPodLinks(cfg, pod) - incidentID := incident.CreateEvent(cfg, links, podKey, summary, details, ilert.EventTypes.Alert, cfg.Alarms.Pods.Waiting.Priority) - incident.CreateIncidentRef(agentKubeClient, pod.GetName(), pod.GetNamespace(), incidentID, summary, details, "waiting") - break - } - - if cfg.Alarms.Pods.Restarts.Enabled && containerStatus.RestartCount >= cfg.Alarms.Pods.Restarts.Threshold && incidentRef == nil { - summary := fmt.Sprintf("Pod %s/%s restarts threshold reached: %d", pod.GetNamespace(), pod.GetName(), containerStatus.RestartCount) - details := getPodDetailsWithStatus(kubeClient, pod, &containerStatus) - links := getPodLinks(cfg, pod) - incidentID := incident.CreateEvent(cfg, links, podKey, summary, details, ilert.EventTypes.Alert, cfg.Alarms.Pods.Restarts.Priority) - incident.CreateIncidentRef(agentKubeClient, pod.GetName(), pod.GetNamespace(), incidentID, summary, details, "restarts") - break - } - } + analyzePodStatus(pod, cfg) }, DeleteFunc: func(obj interface{}) { pod := obj.(*api.Pod) log.Debug().Interface("pod", pod.Name).Msg("Delete Pod") - incident.DeleteIncidentRef(agentKubeClient, pod.GetName(), pod.GetNamespace()) + incident.DeleteIncidentRef(cfg.AgentKubeClient, pod.GetName(), pod.GetNamespace()) }, }) diff --git a/version.go b/version.go index 973f9cd..82b5a04 100644 --- a/version.go +++ b/version.go @@ -1,7 +1,7 @@ package shared // Version current version -const Version = "v1.5.0" +const Version = "v1.6.0" // App name const App = "ilert-kube-agent"