Skip to content

Commit

Permalink
chore: add k8s metrics receiving status
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Jan 29, 2025
1 parent 6dfea14 commit cb6db04
Show file tree
Hide file tree
Showing 6 changed files with 304 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,9 @@ func (aH *APIHandler) RegisterInfraMetricsRoutes(router *mux.Router, am *AuthMid
jobsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getJobAttributeKeys)).Methods(http.MethodGet)
jobsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getJobAttributeValues)).Methods(http.MethodGet)
jobsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getJobList)).Methods(http.MethodPost)

infraOnboardingSubRouter := router.PathPrefix("/api/v1/infra_onboarding").Subrouter()
infraOnboardingSubRouter.HandleFunc("/k8s/status", am.ViewAccess(aH.getK8sInfraOnboardingStatus)).Methods(http.MethodGet)
}

func (aH *APIHandler) RegisterWebSocketPaths(router *mux.Router, am *AuthMiddleware) {
Expand Down
49 changes: 49 additions & 0 deletions pkg/query-service/app/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,3 +597,52 @@ func (aH *APIHandler) getPvcAttributeValues(w http.ResponseWriter, r *http.Reque

aH.Respond(w, values)
}

func (aH *APIHandler) getK8sInfraOnboardingStatus(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

status := model.OnboardingStatus{}

didSendPodMetrics, err := aH.podsRepo.DidSendPodMetrics(ctx)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}

if !didSendPodMetrics {
aH.Respond(w, status)
return
}

didSendClusterMetrics, err := aH.podsRepo.DidSendClusterMetrics(ctx)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}

didSendNodeMetrics, err := aH.nodesRepo.DidSendNodeMetrics(ctx)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}

didSendOptionalPodMetrics, err := aH.podsRepo.IsSendingOptionalPodMetrics(ctx)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}

requiredMetadata, err := aH.podsRepo.SendingRequiredMetadata(ctx)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}

status.DidSendPodMetrics = didSendPodMetrics
status.DidSendClusterMetrics = didSendClusterMetrics
status.DidSendNodeMetrics = didSendNodeMetrics
status.IsSendingOptionalPodMetrics = didSendOptionalPodMetrics
status.IsSendingRequiredMetadata = requiredMetadata

aH.Respond(w, status)
}
78 changes: 78 additions & 0 deletions pkg/query-service/app/inframetrics/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,84 @@ import (
"go.signoz.io/signoz/pkg/query-service/model"
)

var (
// TODO(srikanthccv): import metadata yaml from receivers and use generated files to check the metrics
podMetricNamesToCheck = []string{
"k8s_pod_cpu_utilization",
"k8s_pod_memory_usage",
"k8s_pod_cpu_request_utilization",
"k8s_pod_memory_request_utilization",
"k8s_pod_cpu_limit_utilization",
"k8s_pod_memory_limit_utilization",
"k8s_container_restarts",
"k8s_pod_phase",
}
nodeMetricNamesToCheck = []string{
"k8s_node_cpu_utilization",
"k8s_node_allocatable_cpu",
"k8s_node_memory_usage",
"k8s_node_allocatable_memory",
"k8s_node_condition_ready",
}
clusterMetricNamesToCheck = []string{
"k8s_daemonset_desired_scheduled_nodes",
"k8s_daemonset_current_scheduled_nodes",
"k8s_deployment_desired",
"k8s_deployment_available",
"k8s_job_desired_successful_pods",
"k8s_job_active_pods",
"k8s_job_failed_pods",
"k8s_job_successful_pods",
"k8s_statefulset_desired_pods",
"k8s_statefulset_current_pods",
}
optionalPodMetricNamesToCheck = []string{
"k8s_pod_cpu_request_utilization",
"k8s_pod_memory_request_utilization",
"k8s_pod_cpu_limit_utilization",
"k8s_pod_memory_limit_utilization",
}

// did they ever send _any_ pod metrics?
didSendPodMetricsQuery = `
SELECT count() FROM %s.%s WHERE metric_name IN (%s)
`

// did they ever send any node metrics?
didSendNodeMetricsQuery = `
SELECT count() FROM %s.%s WHERE metric_name IN (%s)
`

// did they ever send any cluster metrics?
didSendClusterMetricsQuery = `
SELECT count() FROM %s.%s WHERE metric_name IN (%s)
`

// if they ever sent _any_ pod metrics, we assume they know how to send pod metrics
// now, are they sending optional pod metrics such request/limit metrics?
isSendingOptionalPodMetricsQuery = `
SELECT count() FROM %s.%s WHERE metric_name IN (%s)
`

// there should be [cluster, node, namespace, one of (deployment, statefulset, daemonset, cronjob, job)] for each pod
isSendingRequiredMetadataQuery = `
SELECT any(JSONExtractString(labels, 'k8s_cluster_name')) as k8s_cluster_name,
any(JSONExtractString(labels, 'k8s_node_name')) as k8s_node_name,
any(JSONExtractString(labels, 'k8s_namespace_name')) as k8s_namespace_name,
any(JSONExtractString(labels, 'k8s_deployment_name')) as k8s_deployment_name,
any(JSONExtractString(labels, 'k8s_statefulset_name')) as k8s_statefulset_name,
any(JSONExtractString(labels, 'k8s_daemonset_name')) as k8s_daemonset_name,
any(JSONExtractString(labels, 'k8s_cronjob_name')) as k8s_cronjob_name,
any(JSONExtractString(labels, 'k8s_job_name')) as k8s_job_name,
JSONExtractString(labels, 'k8s_pod_name') as k8s_pod_name
FROM %s.%s WHERE metric_name IN (%s)
AND (unix_milli >= (toUnixTimestamp(now() - toIntervalMinute(60)) * 1000))
AND JSONExtractString(labels, 'k8s_namespace_name') NOT IN ('kube-system', 'kube-public', 'kube-node-lease', 'metallb-system')
GROUP BY k8s_pod_name
LIMIT 1 BY k8s_cluster_name, k8s_node_name, k8s_namespace_name
`
)

// getParamsForTopItems returns the step, time series table name and samples table name
// for the top items query. what are we doing here?
// we want to identify the top hosts/pods/nodes quickly, so we use pre-aggregated data
Expand Down
17 changes: 17 additions & 0 deletions pkg/query-service/app/inframetrics/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package inframetrics

import (
"context"
"fmt"
"math"
"sort"
"strings"

"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
Expand Down Expand Up @@ -62,6 +65,20 @@ func (n *NodesRepo) GetNodeAttributeKeys(ctx context.Context, req v3.FilterAttri
return attributeKeysResponse, nil
}

func (n *NodesRepo) DidSendNodeMetrics(ctx context.Context) (bool, error) {
namesStr := "'" + strings.Join(nodeMetricNamesToCheck, "','") + "'"

query := fmt.Sprintf(didSendNodeMetricsQuery,
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr)

count, err := n.reader.GetCountOfThings(ctx, query)
if err != nil {
return false, err
}

return count > 0, nil
}

func (n *NodesRepo) GetNodeAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForNodes
Expand Down
134 changes: 134 additions & 0 deletions pkg/query-service/app/inframetrics/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package inframetrics

import (
"context"
"fmt"
"math"
"sort"
"strings"

"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
Expand Down Expand Up @@ -105,6 +108,137 @@ func (p *PodsRepo) GetPodAttributeValues(ctx context.Context, req v3.FilterAttri
return attributeValuesResponse, nil
}

func (p *PodsRepo) DidSendPodMetrics(ctx context.Context) (bool, error) {
namesStr := "'" + strings.Join(podMetricNamesToCheck, "','") + "'"

query := fmt.Sprintf(didSendPodMetricsQuery,
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr)

count, err := p.reader.GetCountOfThings(ctx, query)
if err != nil {
return false, err
}

return count > 0, nil
}

func (p *PodsRepo) DidSendClusterMetrics(ctx context.Context) (bool, error) {
namesStr := "'" + strings.Join(clusterMetricNamesToCheck, "','") + "'"

query := fmt.Sprintf(didSendClusterMetricsQuery,
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr)

count, err := p.reader.GetCountOfThings(ctx, query)
if err != nil {
return false, err
}

return count > 0, nil
}

func (p *PodsRepo) IsSendingOptionalPodMetrics(ctx context.Context) (bool, error) {
namesStr := "'" + strings.Join(optionalPodMetricNamesToCheck, "','") + "'"

query := fmt.Sprintf(isSendingOptionalPodMetricsQuery,
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr)

count, err := p.reader.GetCountOfThings(ctx, query)
if err != nil {
return false, err
}

return count > 0, nil
}

func (p *PodsRepo) SendingRequiredMetadata(ctx context.Context) ([]model.PodOnboardingStatus, error) {
namesStr := "'" + strings.Join(podMetricNamesToCheck, "','") + "'"

query := fmt.Sprintf(isSendingRequiredMetadataQuery,
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_V4_TABLENAME, namesStr)

result, err := p.reader.GetListResultV3(ctx, query)
if err != nil {
return nil, err
}

statuses := []model.PodOnboardingStatus{}

// for each pod, check if we have all the required metadata
for _, row := range result {
status := model.PodOnboardingStatus{}
switch v := row.Data["k8s_cluster_name"].(type) {
case string:
status.HasClusterName = true
status.ClusterName = v
case *string:
status.HasClusterName = *v != ""
status.ClusterName = *v
}
switch v := row.Data["k8s_node_name"].(type) {
case string:
status.HasNodeName = true
status.NodeName = v
case *string:
status.HasNodeName = *v != ""
status.NodeName = *v
}
switch v := row.Data["k8s_namespace_name"].(type) {
case string:
status.HasNamespaceName = true
status.NamespaceName = v
case *string:
status.HasNamespaceName = *v != ""
status.NamespaceName = *v
}
switch v := row.Data["k8s_deployment_name"].(type) {
case string:
status.HasDeploymentName = true
case *string:
status.HasDeploymentName = *v != ""
}
switch v := row.Data["k8s_statefulset_name"].(type) {
case string:
status.HasStatefulsetName = true
case *string:
status.HasStatefulsetName = *v != ""
}
switch v := row.Data["k8s_daemonset_name"].(type) {
case string:
status.HasDaemonsetName = true
case *string:
status.HasDaemonsetName = *v != ""
}
switch v := row.Data["k8s_cronjob_name"].(type) {
case string:
status.HasCronjobName = true
case *string:
status.HasCronjobName = *v != ""
}
switch v := row.Data["k8s_job_name"].(type) {
case string:
status.HasJobName = true
case *string:
status.HasJobName = *v != ""
}

switch v := row.Data["k8s_pod_name"].(type) {
case string:
status.PodName = v
case *string:
status.PodName = *v
}

if !status.HasClusterName ||
!status.HasNodeName ||
!status.HasNamespaceName ||
(!status.HasDeploymentName && !status.HasStatefulsetName && !status.HasDaemonsetName && !status.HasCronjobName && !status.HasJobName) {
statuses = append(statuses, status)
}
}

return statuses, nil
}

func (p *PodsRepo) getMetadataAttributes(ctx context.Context, req model.PodListRequest) (map[string]map[string]string, error) {
podAttrs := map[string]map[string]string{}

Expand Down
23 changes: 23 additions & 0 deletions pkg/query-service/model/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,3 +731,26 @@ type VolumeListRecord struct {
VolumeUsage float64 `json:"volumeUsage"`
Meta map[string]string `json:"meta"`
}

type PodOnboardingStatus struct {
ClusterName string `json:"clusterName"`
NodeName string `json:"nodeName"`
NamespaceName string `json:"namespaceName"`
PodName string `json:"podName"`
HasClusterName bool `json:"hasClusterName"`
HasNodeName bool `json:"hasNodeName"`
HasNamespaceName bool `json:"hasNamespaceName"`
HasDeploymentName bool `json:"hasDeploymentName"`
HasStatefulsetName bool `json:"hasStatefulsetName"`
HasDaemonsetName bool `json:"hasDaemonsetName"`
HasCronjobName bool `json:"hasCronjobName"`
HasJobName bool `json:"hasJobName"`
}

type OnboardingStatus struct {
DidSendPodMetrics bool `json:"didSendPodMetrics"`
DidSendNodeMetrics bool `json:"didSendNodeMetrics"`
DidSendClusterMetrics bool `json:"didSendClusterMetrics"`
IsSendingOptionalPodMetrics bool `json:"isSendingOptionalPodMetrics"`
IsSendingRequiredMetadata []PodOnboardingStatus `json:"isSendingRequiredMetadata"`
}

0 comments on commit cb6db04

Please sign in to comment.