Skip to content

Commit

Permalink
- 新增etcd采集
Browse files Browse the repository at this point in the history
- 新增golang进程指标采集
- 完善readme和大盘
  • Loading branch information
ning1875 committed Jan 26, 2021
1 parent 5ef1bb0 commit d80d4f1
Show file tree
Hide file tree
Showing 23 changed files with 1,127 additions and 277 deletions.
12 changes: 6 additions & 6 deletions collect/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,12 @@ func GetServerSideAddr(cg *config.CommonApiServerConfig, logger log.Logger, data
return
}

func AsyncCurlMetricsAndPush(controlChan chan int, c *config.CommonApiServerConfig, logger log.Logger, funcName string, m map[string]string, step int64, tw int64, index int, allNum int, serverSideNid string, pushServerAddr string) {
func AsyncCurlMetricsAndPush(controlChan chan int, c *config.CommonApiServerConfig, logger log.Logger, funcName string, m map[string]string, step int64, tw int64, index int, allNum int, serverSideNid string, pushServerAddr string, dropBucket bool) {
start := time.Now()
defer func() {
<-controlChan
}()
metricList, err := CurlTlsMetricsApi(logger, funcName, c, m, step, tw)
metricList, err := CurlTlsMetricsApi(logger, funcName, c, m, step, tw, dropBucket)

if err != nil {
level.Error(logger).Log("msg", "CurlTlsMetricsResError", "func_name", funcName, "err:", err, "seq", fmt.Sprintf("%d/%d", index, allNum), "addr", c.Addr)
Expand Down Expand Up @@ -465,7 +465,7 @@ func sum64(hash [md5.Size]byte) uint64 {
return s
}

func ConcurrencyCurlMetricsByIpsSetNid(cg *config.CommonApiServerConfig, logger log.Logger, dataMap *HistoryMap, funcName string, appendTags map[string]string, step int64, tw int64, multiServerInstanceUniqueLabel string, multiFuncUniqueLabel string, serverSideNid string, pushServerAddr string) {
func ConcurrencyCurlMetricsByIpsSetNid(cg *config.CommonApiServerConfig, logger log.Logger, dataMap *HistoryMap, funcName string, appendTags map[string]string, step int64, tw int64, multiServerInstanceUniqueLabel string, multiFuncUniqueLabel string, serverSideNid string, pushServerAddr string, dropBucket bool) {
metricUrlMap := GetServerSideAddr(cg, logger, dataMap, funcName)
if len(metricUrlMap) == 0 {
level.Error(logger).Log("msg", "GetServerSideAddrEmpty", "funcName:", funcName)
Expand Down Expand Up @@ -509,13 +509,13 @@ func ConcurrencyCurlMetricsByIpsSetNid(cg *config.CommonApiServerConfig, logger
for k, v := range appendTags {
m[k] = v
}
go AsyncCurlMetricsAndPush(controlChan, c, logger, funcName, m, step, tw, seq+1, len(metricUrlMap), serverSideNid, pushServerAddr)
go AsyncCurlMetricsAndPush(controlChan, c, logger, funcName, m, step, tw, seq+1, len(metricUrlMap), serverSideNid, pushServerAddr, dropBucket)
seq += 1
}
return
}

func CurlTlsMetricsApi(logger log.Logger, funcName string, cg *config.CommonApiServerConfig, appendTagsM map[string]string, step int64, timeout int64) ([]dataobj.MetricValue, error) {
func CurlTlsMetricsApi(logger log.Logger, funcName string, cg *config.CommonApiServerConfig, appendTagsM map[string]string, step int64, timeout int64, dropBucket bool) ([]dataobj.MetricValue, error) {
//start := time.Now()
client, err := config_util.NewClientFromConfig(cg.HTTPClientConfig, funcName, false, false)
if err != nil {
Expand All @@ -541,6 +541,6 @@ func CurlTlsMetricsApi(logger log.Logger, funcName string, cg *config.CommonApiS
return nil, err
}

metrics, err := ParseCommon(bodyBytes, MapWhiteMetricsMap(cg.MetricsWhiteList), appendTagsM, step, logger)
metrics, err := ParseCommon(bodyBytes, MapWhiteMetricsMap(cg.MetricsWhiteList), appendTagsM, step, logger, dropBucket)
return metrics, err
}
13 changes: 13 additions & 0 deletions collect/get_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func GetServerAddrByGetPod(logger log.Logger, dataMap *HistoryMap) {
kubeSchedulerIps := make([]string, 0)
kubeControllerIps := make([]string, 0)
apiServerIps := make([]string, 0)
etcdIps := make([]string, 0)
coreDnsIps := make([]string, 0)
kubeProxyIps := make([]string, 0)
if len(pods.Items) == 0 {
Expand Down Expand Up @@ -71,6 +72,14 @@ func GetServerAddrByGetPod(logger log.Logger, dataMap *HistoryMap) {

}

if p.Labels["tier"] == "control-plane" && p.Labels["component"] == "etcd" {
ip := p.Status.PodIP
if ip != "" {
etcdIps = append(etcdIps, p.Status.PodIP)
}

}

if p.Labels["k8s-app"] == "kube-dns" {
ip := p.Status.PodIP
if ip != "" {
Expand All @@ -93,6 +102,7 @@ func GetServerAddrByGetPod(logger log.Logger, dataMap *HistoryMap) {
"num_apiServerIps", len(apiServerIps),
"num_coreDnsIps", len(coreDnsIps),
"num_kubeProxyIps", len(kubeProxyIps),
"num_etcdIps", len(etcdIps),
"time_took_seconds", time.Since(start).Seconds(),
)
if len(coreDnsIps) > 0 {
Expand All @@ -101,6 +111,9 @@ func GetServerAddrByGetPod(logger log.Logger, dataMap *HistoryMap) {
if len(apiServerIps) > 0 {
dataMap.Map.Store(kconfig.FUNCNAME_APISERVER, apiServerIps)
}
if len(etcdIps) > 0 {
dataMap.Map.Store(kconfig.FUNCNAME_ETCD, etcdIps)
}
if len(kubeSchedulerIps) > 0 {
dataMap.Map.Store(kconfig.FUNCNAME_KUBESCHEDULER, kubeSchedulerIps)
}
Expand Down
43 changes: 37 additions & 6 deletions collect/kube_apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ func DoApiServerCollect(cg *config.Config, logger log.Logger, dataMap *HistoryMa
apiserver_response_sizes_bucket_m := make(map[float64]float64)
apiserver_response_sizes_bucket := "apiserver_response_sizes_bucket"

workqueue_queue_duration_seconds_bucket_m := make(map[float64]float64)
// 共同指标
rest_client_request_duration_seconds_bucket := "rest_client_request_duration_seconds_bucket"
rest_client_request_duration_seconds_bucket_m := make(map[float64]float64)

workqueue_queue_duration_seconds_bucket := "workqueue_queue_duration_seconds_bucket"
workqueue_queue_duration_seconds_bucket_m := make(map[float64]float64)

workqueue_work_duration_seconds_bucket_m := make(map[float64]float64)
workqueue_work_duration_seconds_bucket := "workqueue_work_duration_seconds_bucket"
workqueue_work_duration_seconds_bucket_m := make(map[float64]float64)

etcd_request_duration_seconds_bucket_m := make(map[float64]float64)
etcd_request_duration_seconds_bucket := "etcd_request_duration_seconds_bucket"
Expand All @@ -56,6 +60,9 @@ func DoApiServerCollect(cg *config.Config, logger log.Logger, dataMap *HistoryMa
apiserver_response_sizes_sum := "apiserver_response_sizes_sum"
apiserver_response_sizes_count := "apiserver_response_sizes_count"

rest_client_request_duration_seconds_sum := "rest_client_request_duration_seconds_sum"
rest_client_request_duration_seconds_count := "rest_client_request_duration_seconds_count"

workqueue_queue_duration_seconds_sum := "workqueue_queue_duration_seconds_sum"
workqueue_queue_duration_seconds_count := "workqueue_queue_duration_seconds_count"

Expand All @@ -80,7 +87,7 @@ func DoApiServerCollect(cg *config.Config, logger log.Logger, dataMap *HistoryMa
for k, v := range cg.AppendTags {
newtagsm[k] = v
}
metrics, err := CurlTlsMetricsApi(logger, funcName, c, newtagsm, cg.Step, cg.TimeOutSeconds)
metrics, err := CurlTlsMetricsApi(logger, funcName, c, newtagsm, cg.Step, cg.TimeOutSeconds, false)

if err != nil {
level.Error(logger).Log("msg", "CurlTlsMetricsResError", "func_name", funcName, "err:", err, "seq", fmt.Sprintf("%d/%d", index, allNum), "addr", c.Addr)
Expand Down Expand Up @@ -129,6 +136,12 @@ func DoApiServerCollect(cg *config.Config, logger log.Logger, dataMap *HistoryMa
upperBoundV, _ := strconv.ParseFloat(upperBound, 64)
apiserver_response_sizes_bucket_m[upperBoundV] += metric.Value
continue
case rest_client_request_duration_seconds_bucket:

upperBound := metric.TagsMap["le"]
upperBoundV, _ := strconv.ParseFloat(upperBound, 64)
rest_client_request_duration_seconds_bucket_m[upperBoundV] += metric.Value
continue
case workqueue_queue_duration_seconds_bucket:

upperBound := metric.TagsMap["le"]
Expand Down Expand Up @@ -212,6 +225,24 @@ func DoApiServerCollect(cg *config.Config, logger log.Logger, dataMap *HistoryMa
im["count"] += metric.Value
avg_m[newName] = im

// 共同指标
case rest_client_request_duration_seconds_sum:
newName := strings.Split(metric.Metric, "_sum")[0]
im, loaded := avg_m[newName]
if !loaded {
im = make(map[string]float64)
}
im["sum"] += metric.Value
avg_m[newName] = im

case rest_client_request_duration_seconds_count:
newName := strings.Split(metric.Metric, "_count")[0]
im, loaded := avg_m[newName]
if !loaded {
im = make(map[string]float64)
}
im["count"] += metric.Value
avg_m[newName] = im
case workqueue_queue_duration_seconds_sum:
newName := strings.Split(metric.Metric, "_sum")[0]
im, loaded := avg_m[newName]
Expand Down Expand Up @@ -301,9 +332,9 @@ func DoApiServerCollect(cg *config.Config, logger log.Logger, dataMap *HistoryMa

metricList = histogramDeltaWork(dataMap, apiserver_response_sizes_bucket_m, newtagsm, funcName, apiserver_response_sizes_bucket, cg.ServerSideNid, cg.Step, metricList)

metricList = histogramDeltaWork(dataMap, workqueue_queue_duration_seconds_bucket_m, newtagsm, funcName, workqueue_queue_duration_seconds_bucket, cg.ServerSideNid, cg.Step, metricList)

metricList = histogramDeltaWork(dataMap, workqueue_work_duration_seconds_bucket_m, newtagsm, funcName, workqueue_work_duration_seconds_bucket, cg.ServerSideNid, cg.Step, metricList)
metricList = histogramDeltaWork(dataMap, rest_client_request_duration_seconds_bucket_m, newtagsm, funcName, "apiserver_"+rest_client_request_duration_seconds_bucket, cg.ServerSideNid, cg.Step, metricList)
metricList = histogramDeltaWork(dataMap, workqueue_queue_duration_seconds_bucket_m, newtagsm, funcName, "apiserver_"+workqueue_queue_duration_seconds_bucket, cg.ServerSideNid, cg.Step, metricList)
metricList = histogramDeltaWork(dataMap, workqueue_work_duration_seconds_bucket_m, newtagsm, funcName, "apiserver_"+workqueue_work_duration_seconds_bucket, cg.ServerSideNid, cg.Step, metricList)

metricList = histogramDeltaWork(dataMap, etcd_request_duration_seconds_bucket_m, newtagsm, funcName, etcd_request_duration_seconds_bucket, cg.ServerSideNid, cg.Step, metricList)

Expand Down
170 changes: 168 additions & 2 deletions collect/kube_controller_manager.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,179 @@
package collect

import (
"fmt"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/go-kit/kit/log"

"github.com/go-kit/kit/log/level"
"github.com/n9e/k8s-mon/config"
"strconv"
"strings"
"time"
)

func DoKubeControllerCollect(cg *config.Config, logger log.Logger, dataMap *HistoryMap, funcName string) {

ConcurrencyCurlMetricsByIpsSetNid(cg.KubeControllerC, logger, dataMap, funcName, cg.AppendTags, cg.Step, cg.TimeOutSeconds, cg.MultiServerInstanceUniqueLabel, cg.MultiFuncUniqueLabel, cg.ServerSideNid, cg.PushServerAddr)
start := time.Now()
metricUrlMap := GetServerSideAddr(cg.KubeControllerC, logger, dataMap, funcName)
if len(metricUrlMap) == 0 {
level.Error(logger).Log("msg", "GetServerSideAddrEmpty", "funcName:", funcName)
return
}

rest_client_request_duration_seconds_bucket := "rest_client_request_duration_seconds_bucket"
rest_client_request_duration_seconds_bucket_m := make(map[float64]float64)

workqueue_queue_duration_seconds_bucket := "workqueue_queue_duration_seconds_bucket"
workqueue_queue_duration_seconds_bucket_m := make(map[float64]float64)

workqueue_work_duration_seconds_bucket := "workqueue_work_duration_seconds_bucket"
workqueue_work_duration_seconds_bucket_m := make(map[float64]float64)

rest_client_request_duration_seconds_sum := "rest_client_request_duration_seconds_sum"
rest_client_request_duration_seconds_count := "rest_client_request_duration_seconds_count"

workqueue_queue_duration_seconds_sum := "workqueue_queue_duration_seconds_sum"
workqueue_queue_duration_seconds_count := "workqueue_queue_duration_seconds_count"

workqueue_work_duration_seconds_sum := "workqueue_work_duration_seconds_sum"
workqueue_work_duration_seconds_count := "workqueue_work_duration_seconds_count"

avg_m := make(map[string]map[string]float64)

var metricList []dataobj.MetricValue
index := 0
allNum := len(metricUrlMap)
for uniqueHost, murl := range metricUrlMap {
tmp := *cg.KubeControllerC
c := &tmp
c.Addr = murl
// 添加service_addr tag
newtagsm := map[string]string{
cg.MultiServerInstanceUniqueLabel: uniqueHost,
cg.MultiFuncUniqueLabel: funcName,
}
for k, v := range cg.AppendTags {
newtagsm[k] = v
}
metrics, err := CurlTlsMetricsApi(logger, funcName, c, newtagsm, cg.Step, cg.TimeOutSeconds, false)

if err != nil {
level.Error(logger).Log("msg", "CurlTlsMetricsResError", "func_name", funcName, "err:", err, "seq", fmt.Sprintf("%d/%d", index, allNum), "addr", c.Addr)
continue
}
if len(metrics) == 0 {
level.Error(logger).Log("msg", "CurlTlsMetricsResEmpty", "func_name", funcName, "seq", fmt.Sprintf("%d/%d", index, allNum), "addr", c.Addr)
continue
}

for _, metric := range metrics {

switch metric.Metric {
case rest_client_request_duration_seconds_bucket:

upperBound := metric.TagsMap["le"]
upperBoundV, _ := strconv.ParseFloat(upperBound, 64)
rest_client_request_duration_seconds_bucket_m[upperBoundV] += metric.Value
continue
case workqueue_queue_duration_seconds_bucket:

upperBound := metric.TagsMap["le"]
upperBoundV, _ := strconv.ParseFloat(upperBound, 64)
workqueue_queue_duration_seconds_bucket_m[upperBoundV] += metric.Value
continue
case workqueue_work_duration_seconds_bucket:

upperBound := metric.TagsMap["le"]
upperBoundV, _ := strconv.ParseFloat(upperBound, 64)
workqueue_work_duration_seconds_bucket_m[upperBoundV] += metric.Value
continue
// 共同指标
case rest_client_request_duration_seconds_sum:
newName := strings.Split(metric.Metric, "_sum")[0]
im, loaded := avg_m[newName]
if !loaded {
im = make(map[string]float64)
}
im["sum"] += metric.Value
avg_m[newName] = im

case rest_client_request_duration_seconds_count:
newName := strings.Split(metric.Metric, "_count")[0]
im, loaded := avg_m[newName]
if !loaded {
im = make(map[string]float64)
}
im["count"] += metric.Value
avg_m[newName] = im
case workqueue_queue_duration_seconds_sum:
newName := strings.Split(metric.Metric, "_sum")[0]
im, loaded := avg_m[newName]
if !loaded {
im = make(map[string]float64)
}
im["sum"] += metric.Value
avg_m[newName] = im

case workqueue_queue_duration_seconds_count:
newName := strings.Split(metric.Metric, "_count")[0]
im, loaded := avg_m[newName]
if !loaded {
im = make(map[string]float64)
}
im["count"] += metric.Value
avg_m[newName] = im

case workqueue_work_duration_seconds_sum:
newName := strings.Split(metric.Metric, "_sum")[0]
im, loaded := avg_m[newName]
if !loaded {
im = make(map[string]float64)
}
im["sum"] += metric.Value
avg_m[newName] = im
case workqueue_work_duration_seconds_count:
newName := strings.Split(metric.Metric, "_count")[0]
im, loaded := avg_m[newName]
if !loaded {
im = make(map[string]float64)
}
im["count"] += metric.Value
avg_m[newName] = im
default:
if strings.HasSuffix(metric.Metric, "_bucket") {
continue
}

}
// common
if metric.CounterType == config.METRIC_TYPE_COUNTER {
metric.Metric = metric.Metric + config.COUNTER_TO_GAUGE_METRIC_NAME_SUFFIX
}

metric.Nid = cg.ServerSideNid
metricList = append(metricList, metric)
}
}

newtagsm := map[string]string{
cg.MultiFuncUniqueLabel: funcName,
}
for k, v := range cg.AppendTags {
newtagsm[k] = v
}

// 开始算quantile
metricList = histogramDeltaWork(dataMap, rest_client_request_duration_seconds_bucket_m, newtagsm, funcName, "controller_manager_"+rest_client_request_duration_seconds_bucket, cg.ServerSideNid, cg.Step, metricList)
metricList = histogramDeltaWork(dataMap, workqueue_queue_duration_seconds_bucket_m, newtagsm, funcName, "controller_manager_"+workqueue_queue_duration_seconds_bucket, cg.ServerSideNid, cg.Step, metricList)
metricList = histogramDeltaWork(dataMap, workqueue_work_duration_seconds_bucket_m, newtagsm, funcName, "controller_manager_"+workqueue_work_duration_seconds_bucket, cg.ServerSideNid, cg.Step, metricList)

// 开始算平均值
for mName, avgm := range avg_m {
mm := avgCompute(avgm, cg.ServerSideNid, mName, cg.Step, newtagsm)
metricList = append(metricList, mm...)

}
level.Info(logger).Log("msg", "DoCollectSuccessfullyReadyToPush", "funcName", funcName, "metrics_num", len(metricList), "time_took_seconds", time.Since(start).Seconds())
go PushWork(cg.PushServerAddr, cg.TimeOutSeconds, metricList, logger, funcName)

}
2 changes: 1 addition & 1 deletion collect/kube_coredns.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func DoKubeCoreDnsCollect(cg *config.Config, logger log.Logger, dataMap *History
for k, v := range cg.AppendTags {
newtagsm[k] = v
}
metrics, err := CurlTlsMetricsApi(logger, funcName, c, newtagsm, cg.Step, cg.TimeOutSeconds)
metrics, err := CurlTlsMetricsApi(logger, funcName, c, newtagsm, cg.Step, cg.TimeOutSeconds, false)

if err != nil {
level.Error(logger).Log("msg", "CurlTlsMetricsResError", "func_name", funcName, "err:", err, "seq", fmt.Sprintf("%d/%d", index, allNum), "addr", c.Addr)
Expand Down
Loading

0 comments on commit d80d4f1

Please sign in to comment.