From c8032f771e7d004db4a04c91c93a38a1f9bd4478 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Fri, 31 Jan 2025 20:37:04 +0530 Subject: [PATCH] chore: add k8s metrics receiving status (#6977) --- pkg/query-service/app/http_handler.go | 3 + pkg/query-service/app/infra.go | 49 +++++++ pkg/query-service/app/inframetrics/common.go | 78 +++++++++++ pkg/query-service/app/inframetrics/nodes.go | 17 +++ pkg/query-service/app/inframetrics/pods.go | 134 +++++++++++++++++++ pkg/query-service/model/infra.go | 23 ++++ 6 files changed, 304 insertions(+) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index acaa658daee..4b62ec65d2d 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -448,6 +448,9 @@ func (aH *APIHandler) RegisterInfraMetricsRoutes(router *mux.Router, am *AuthMid jobsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getJobAttributeKeys)).Methods(http.MethodGet) jobsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getJobAttributeValues)).Methods(http.MethodGet) jobsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getJobList)).Methods(http.MethodPost) + + infraOnboardingSubRouter := router.PathPrefix("/api/v1/infra_onboarding").Subrouter() + infraOnboardingSubRouter.HandleFunc("/k8s/status", am.ViewAccess(aH.getK8sInfraOnboardingStatus)).Methods(http.MethodGet) } func (aH *APIHandler) RegisterWebSocketPaths(router *mux.Router, am *AuthMiddleware) { diff --git a/pkg/query-service/app/infra.go b/pkg/query-service/app/infra.go index b48fb06c35b..eb32af3e083 100644 --- a/pkg/query-service/app/infra.go +++ b/pkg/query-service/app/infra.go @@ -597,3 +597,52 @@ func (aH *APIHandler) getPvcAttributeValues(w http.ResponseWriter, r *http.Reque aH.Respond(w, values) } + +func (aH *APIHandler) getK8sInfraOnboardingStatus(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + status := model.OnboardingStatus{} + + didSendPodMetrics, err := aH.podsRepo.DidSendPodMetrics(ctx) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + if !didSendPodMetrics { + aH.Respond(w, status) + return + } + + didSendClusterMetrics, err := aH.podsRepo.DidSendClusterMetrics(ctx) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + didSendNodeMetrics, err := aH.nodesRepo.DidSendNodeMetrics(ctx) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + didSendOptionalPodMetrics, err := aH.podsRepo.IsSendingOptionalPodMetrics(ctx) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + requiredMetadata, err := aH.podsRepo.SendingRequiredMetadata(ctx) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + status.DidSendPodMetrics = didSendPodMetrics + status.DidSendClusterMetrics = didSendClusterMetrics + status.DidSendNodeMetrics = didSendNodeMetrics + status.IsSendingOptionalPodMetrics = didSendOptionalPodMetrics + status.IsSendingRequiredMetadata = requiredMetadata + + aH.Respond(w, status) +} diff --git a/pkg/query-service/app/inframetrics/common.go b/pkg/query-service/app/inframetrics/common.go index b57cc7ee412..d2012ef2a56 100644 --- a/pkg/query-service/app/inframetrics/common.go +++ b/pkg/query-service/app/inframetrics/common.go @@ -8,6 +8,84 @@ import ( "go.signoz.io/signoz/pkg/query-service/model" ) +var ( + // TODO(srikanthccv): import metadata yaml from receivers and use generated files to check the metrics + podMetricNamesToCheck = []string{ + "k8s_pod_cpu_utilization", + "k8s_pod_memory_usage", + "k8s_pod_cpu_request_utilization", + "k8s_pod_memory_request_utilization", + "k8s_pod_cpu_limit_utilization", + "k8s_pod_memory_limit_utilization", + "k8s_container_restarts", + "k8s_pod_phase", + } + nodeMetricNamesToCheck = []string{ + "k8s_node_cpu_utilization", + "k8s_node_allocatable_cpu", + "k8s_node_memory_usage", + "k8s_node_allocatable_memory", + "k8s_node_condition_ready", + } + clusterMetricNamesToCheck = []string{ + "k8s_daemonset_desired_scheduled_nodes", + "k8s_daemonset_current_scheduled_nodes", + "k8s_deployment_desired", + "k8s_deployment_available", + "k8s_job_desired_successful_pods", + "k8s_job_active_pods", + "k8s_job_failed_pods", + "k8s_job_successful_pods", + "k8s_statefulset_desired_pods", + "k8s_statefulset_current_pods", + } + optionalPodMetricNamesToCheck = []string{ + "k8s_pod_cpu_request_utilization", + "k8s_pod_memory_request_utilization", + "k8s_pod_cpu_limit_utilization", + "k8s_pod_memory_limit_utilization", + } + + // did they ever send _any_ pod metrics? + didSendPodMetricsQuery = ` + SELECT count() FROM %s.%s WHERE metric_name IN (%s) +` + + // did they ever send any node metrics? + didSendNodeMetricsQuery = ` + SELECT count() FROM %s.%s WHERE metric_name IN (%s) +` + + // did they ever send any cluster metrics? + didSendClusterMetricsQuery = ` + SELECT count() FROM %s.%s WHERE metric_name IN (%s) +` + + // if they ever sent _any_ pod metrics, we assume they know how to send pod metrics + // now, are they sending optional pod metrics such request/limit metrics? + isSendingOptionalPodMetricsQuery = ` + SELECT count() FROM %s.%s WHERE metric_name IN (%s) +` + + // there should be [cluster, node, namespace, one of (deployment, statefulset, daemonset, cronjob, job)] for each pod + isSendingRequiredMetadataQuery = ` + SELECT any(JSONExtractString(labels, 'k8s_cluster_name')) as k8s_cluster_name, + any(JSONExtractString(labels, 'k8s_node_name')) as k8s_node_name, + any(JSONExtractString(labels, 'k8s_namespace_name')) as k8s_namespace_name, + any(JSONExtractString(labels, 'k8s_deployment_name')) as k8s_deployment_name, + any(JSONExtractString(labels, 'k8s_statefulset_name')) as k8s_statefulset_name, + any(JSONExtractString(labels, 'k8s_daemonset_name')) as k8s_daemonset_name, + any(JSONExtractString(labels, 'k8s_cronjob_name')) as k8s_cronjob_name, + any(JSONExtractString(labels, 'k8s_job_name')) as k8s_job_name, + JSONExtractString(labels, 'k8s_pod_name') as k8s_pod_name + FROM %s.%s WHERE metric_name IN (%s) + AND (unix_milli >= (toUnixTimestamp(now() - toIntervalMinute(60)) * 1000)) + AND JSONExtractString(labels, 'k8s_namespace_name') NOT IN ('kube-system', 'kube-public', 'kube-node-lease', 'metallb-system') + GROUP BY k8s_pod_name + LIMIT 1 BY k8s_cluster_name, k8s_node_name, k8s_namespace_name +` +) + // getParamsForTopItems returns the step, time series table name and samples table name // for the top items query. what are we doing here? // we want to identify the top hosts/pods/nodes quickly, so we use pre-aggregated data diff --git a/pkg/query-service/app/inframetrics/nodes.go b/pkg/query-service/app/inframetrics/nodes.go index 3a941563a1f..3515e662fd1 100644 --- a/pkg/query-service/app/inframetrics/nodes.go +++ b/pkg/query-service/app/inframetrics/nodes.go @@ -2,11 +2,14 @@ package inframetrics import ( "context" + "fmt" "math" "sort" + "strings" "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" "go.signoz.io/signoz/pkg/query-service/common" + "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" @@ -62,6 +65,20 @@ func (n *NodesRepo) GetNodeAttributeKeys(ctx context.Context, req v3.FilterAttri return attributeKeysResponse, nil } +func (n *NodesRepo) DidSendNodeMetrics(ctx context.Context) (bool, error) { + namesStr := "'" + strings.Join(nodeMetricNamesToCheck, "','") + "'" + + query := fmt.Sprintf(didSendNodeMetricsQuery, + constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr) + + count, err := n.reader.GetCountOfThings(ctx, query) + if err != nil { + return false, err + } + + return count > 0, nil +} + func (n *NodesRepo) GetNodeAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { req.DataSource = v3.DataSourceMetrics req.AggregateAttribute = metricToUseForNodes diff --git a/pkg/query-service/app/inframetrics/pods.go b/pkg/query-service/app/inframetrics/pods.go index 8f970c6b50e..ef6636e5a94 100644 --- a/pkg/query-service/app/inframetrics/pods.go +++ b/pkg/query-service/app/inframetrics/pods.go @@ -2,11 +2,14 @@ package inframetrics import ( "context" + "fmt" "math" "sort" + "strings" "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" "go.signoz.io/signoz/pkg/query-service/common" + "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" @@ -105,6 +108,137 @@ func (p *PodsRepo) GetPodAttributeValues(ctx context.Context, req v3.FilterAttri return attributeValuesResponse, nil } +func (p *PodsRepo) DidSendPodMetrics(ctx context.Context) (bool, error) { + namesStr := "'" + strings.Join(podMetricNamesToCheck, "','") + "'" + + query := fmt.Sprintf(didSendPodMetricsQuery, + constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr) + + count, err := p.reader.GetCountOfThings(ctx, query) + if err != nil { + return false, err + } + + return count > 0, nil +} + +func (p *PodsRepo) DidSendClusterMetrics(ctx context.Context) (bool, error) { + namesStr := "'" + strings.Join(clusterMetricNamesToCheck, "','") + "'" + + query := fmt.Sprintf(didSendClusterMetricsQuery, + constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr) + + count, err := p.reader.GetCountOfThings(ctx, query) + if err != nil { + return false, err + } + + return count > 0, nil +} + +func (p *PodsRepo) IsSendingOptionalPodMetrics(ctx context.Context) (bool, error) { + namesStr := "'" + strings.Join(optionalPodMetricNamesToCheck, "','") + "'" + + query := fmt.Sprintf(isSendingOptionalPodMetricsQuery, + constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr) + + count, err := p.reader.GetCountOfThings(ctx, query) + if err != nil { + return false, err + } + + return count > 0, nil +} + +func (p *PodsRepo) SendingRequiredMetadata(ctx context.Context) ([]model.PodOnboardingStatus, error) { + namesStr := "'" + strings.Join(podMetricNamesToCheck, "','") + "'" + + query := fmt.Sprintf(isSendingRequiredMetadataQuery, + constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_V4_TABLENAME, namesStr) + + result, err := p.reader.GetListResultV3(ctx, query) + if err != nil { + return nil, err + } + + statuses := []model.PodOnboardingStatus{} + + // for each pod, check if we have all the required metadata + for _, row := range result { + status := model.PodOnboardingStatus{} + switch v := row.Data["k8s_cluster_name"].(type) { + case string: + status.HasClusterName = true + status.ClusterName = v + case *string: + status.HasClusterName = *v != "" + status.ClusterName = *v + } + switch v := row.Data["k8s_node_name"].(type) { + case string: + status.HasNodeName = true + status.NodeName = v + case *string: + status.HasNodeName = *v != "" + status.NodeName = *v + } + switch v := row.Data["k8s_namespace_name"].(type) { + case string: + status.HasNamespaceName = true + status.NamespaceName = v + case *string: + status.HasNamespaceName = *v != "" + status.NamespaceName = *v + } + switch v := row.Data["k8s_deployment_name"].(type) { + case string: + status.HasDeploymentName = true + case *string: + status.HasDeploymentName = *v != "" + } + switch v := row.Data["k8s_statefulset_name"].(type) { + case string: + status.HasStatefulsetName = true + case *string: + status.HasStatefulsetName = *v != "" + } + switch v := row.Data["k8s_daemonset_name"].(type) { + case string: + status.HasDaemonsetName = true + case *string: + status.HasDaemonsetName = *v != "" + } + switch v := row.Data["k8s_cronjob_name"].(type) { + case string: + status.HasCronjobName = true + case *string: + status.HasCronjobName = *v != "" + } + switch v := row.Data["k8s_job_name"].(type) { + case string: + status.HasJobName = true + case *string: + status.HasJobName = *v != "" + } + + switch v := row.Data["k8s_pod_name"].(type) { + case string: + status.PodName = v + case *string: + status.PodName = *v + } + + if !status.HasClusterName || + !status.HasNodeName || + !status.HasNamespaceName || + (!status.HasDeploymentName && !status.HasStatefulsetName && !status.HasDaemonsetName && !status.HasCronjobName && !status.HasJobName) { + statuses = append(statuses, status) + } + } + + return statuses, nil +} + func (p *PodsRepo) getMetadataAttributes(ctx context.Context, req model.PodListRequest) (map[string]map[string]string, error) { podAttrs := map[string]map[string]string{} diff --git a/pkg/query-service/model/infra.go b/pkg/query-service/model/infra.go index 19670c42dfb..3aab227401d 100644 --- a/pkg/query-service/model/infra.go +++ b/pkg/query-service/model/infra.go @@ -731,3 +731,26 @@ type VolumeListRecord struct { VolumeUsage float64 `json:"volumeUsage"` Meta map[string]string `json:"meta"` } + +type PodOnboardingStatus struct { + ClusterName string `json:"clusterName"` + NodeName string `json:"nodeName"` + NamespaceName string `json:"namespaceName"` + PodName string `json:"podName"` + HasClusterName bool `json:"hasClusterName"` + HasNodeName bool `json:"hasNodeName"` + HasNamespaceName bool `json:"hasNamespaceName"` + HasDeploymentName bool `json:"hasDeploymentName"` + HasStatefulsetName bool `json:"hasStatefulsetName"` + HasDaemonsetName bool `json:"hasDaemonsetName"` + HasCronjobName bool `json:"hasCronjobName"` + HasJobName bool `json:"hasJobName"` +} + +type OnboardingStatus struct { + DidSendPodMetrics bool `json:"didSendPodMetrics"` + DidSendNodeMetrics bool `json:"didSendNodeMetrics"` + DidSendClusterMetrics bool `json:"didSendClusterMetrics"` + IsSendingOptionalPodMetrics bool `json:"isSendingOptionalPodMetrics"` + IsSendingRequiredMetadata []PodOnboardingStatus `json:"isSendingRequiredMetadata"` +}