Skip to content

Commit

Permalink
1.采集每node上的kube-proxy kubelet-node指标时支持并发数配置和静态分片
Browse files Browse the repository at this point in the history
2.服务组件采集支持并发数设置
3.每种项目配置了相关配置段才会开启,如果不想采集某类指标可以去掉其配置
  • Loading branch information
ning1875 committed Jan 19, 2021
1 parent 4a56689 commit 4254f3e
Show file tree
Hide file tree
Showing 23 changed files with 871 additions and 285 deletions.
22 changes: 1 addition & 21 deletions collect/apiserver.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,12 @@
package collect

import (
"github.com/didi/nightingale/src/common/dataobj"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/n9e/k8s-mon/config"
"time"
)

func DoApiServerCollect(cg *config.Config, logger log.Logger, dataMap *HistoryMap, funcName string) {
start := time.Now()
// get dataM

metricList := CurlMetricsByIps(cg.ApiServerC, logger, dataMap, funcName, cg.AppendTags, cg.Step, cg.TimeOutSeconds, cg.MultiServerInstanceUniqueLabel)

if len(metricList) == 0 {
level.Error(logger).Log("msg", "FinallyCurlTlsMetricsResEmpty", "func_name", funcName)
return
}

// 设置nid
ml := make([]dataobj.MetricValue, 0)
for _, m := range metricList {
m.Nid = cg.ServerSideNid
ml = append(ml, m)

}
level.Info(logger).Log("msg", "DoCollectSuccessfullyReadyToPush", "funcName", funcName, "metrics_num", len(ml), "time_took_seconds", time.Since(start).Seconds())
go PushWork(cg.PushServerAddr, cg.TimeOutSeconds, ml, logger, funcName)
ConcurrencyCurlMetricsByIpsSetNid(cg.ApiServerC, logger, dataMap, funcName, cg.AppendTags, cg.Step, cg.TimeOutSeconds, cg.MultiServerInstanceUniqueLabel, cg.MultiFuncUniqueLabel, cg.ServerSideNid, cg.PushServerAddr)

}
157 changes: 139 additions & 18 deletions collect/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package collect

import (
"context"
"crypto/md5"
"fmt"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/go-kit/kit/log"
Expand All @@ -11,6 +12,7 @@ import (
"io"
"io/ioutil"
"net/http"
"net/url"
"time"
)

Expand Down Expand Up @@ -58,7 +60,7 @@ func GetServerAddrTicker(ctx context.Context, logger log.Logger, step int64, dat
for {
select {
case <-ticker.C:
GetServerAddrByGetPod(logger, dataMap)
GetServerAddrAll(logger, dataMap)
case <-ctx.Done():
level.Info(logger).Log("msg", "GetServerAddrTicker exit....")
return nil
Expand All @@ -74,7 +76,24 @@ func MapWhiteMetricsMap(metricsWhiteList []string) map[string]struct{} {
}
return m
}
func CurlMetricsByIps(cg *config.CommonApiServerConfig, logger log.Logger, dataMap *HistoryMap, funcName string, appendTags map[string]string, step int64, tw int64, multiServerInstanceUniqueLabel string) (metricList []dataobj.MetricValue) {

func GetServerSideAddr(cg *config.CommonApiServerConfig, logger log.Logger, dataMap *HistoryMap, funcName string) (metricUrlMap map[string]string) {
metricUrlMap = make(map[string]string)
if cg.UserSpecified && len(cg.UserSpecifyAddrs) > 0 {
for _, murl := range cg.UserSpecifyAddrs {
u, err := url.Parse(murl)
if err != nil {
level.Error(logger).Log("msg", "GetServerSideAddrParseUrlErrorByUserSpecifyAddrs",
"funcName", funcName,
"metricUrl", murl,
"err", err)
continue
}
metricUrlMap[u.Host] = murl
}

return
}
obj, loaded := dataMap.Map.Load(funcName)
if !loaded {
level.Error(logger).Log("msg", "GetServerAddrByGetPodErrorNoValue", "funcName:", funcName)
Expand All @@ -86,32 +105,134 @@ func CurlMetricsByIps(cg *config.CommonApiServerConfig, logger log.Logger, dataM
level.Error(logger).Log("msg", "GetServerAddrByGetPodErrorEmptyAddrs", "funcName:", funcName)
return
}
level.Debug(logger).Log("msg", "GetServerAddrByGetPodorNode", "funcName", funcName, "num", len(ips))
for _, ip := range ips {
murl := fmt.Sprintf("%s://%s:%d/%s", cg.Scheme, ip, cg.Port, cg.MetricsPath)
metricUrlMap[ip] = murl
}

return
}

func AsyncCurlMetricsAndPush(controlChan chan int, c *config.CommonApiServerConfig, logger log.Logger, funcName string, m map[string]string, step int64, tw int64, index int, allNum int, serverSideNid string, pushServerAddr string) {
start := time.Now()
defer func() {
<-controlChan
}()
metricList, err := CurlTlsMetricsApi(logger, funcName, c, m, step, tw)

if err != nil {
level.Error(logger).Log("msg", "CurlTlsMetricsResError", "func_name", funcName, "err:", err, "seq", fmt.Sprintf("%d/%d", index, allNum), "addr", c.Addr)
return
}
if len(metricList) == 0 {
level.Error(logger).Log("msg", "CurlTlsMetricsResEmpty", "func_name", funcName, "seq", fmt.Sprintf("%d/%d", index, allNum), "addr", c.Addr)
return
}
ml := make([]dataobj.MetricValue, 0)
for _, m := range metricList {

m.Nid = serverSideNid
ml = append(ml, m)

}
level.Info(logger).Log("msg", "DoCollectSuccessfullyReadyToPush", "funcName", funcName, "seq", fmt.Sprintf("%d/%d", index, allNum), "metrics_num", len(ml), "time_took_seconds", time.Since(start).Seconds())
go PushWork(pushServerAddr, tw, ml, logger, funcName)

}

//func CurlMetricsByIps(cg *config.CommonApiServerConfig, logger log.Logger, dataMap *HistoryMap, funcName string, appendTags map[string]string, step int64, tw int64, multiServerInstanceUniqueLabel string) (metricList []dataobj.MetricValue) {
// metricUrlMap := GetServerSideAddr(cg, logger, dataMap, funcName)
// if len(metricUrlMap) == 0 {
// level.Error(logger).Log("msg", "GetServerSideAddrEmpty", "funcName:", funcName)
// return
// }
// seq := 0
// for uniqueHost, murl := range metricUrlMap {
// tmp := *cg
// c := &tmp
// c.Addr = murl
// // 添加service_addr tag
// m := map[string]string{
// multiServerInstanceUniqueLabel: uniqueHost,
// }
// for k, v := range appendTags {
// m[k] = v
// }
//
// ms, err := CurlTlsMetricsApi(logger, funcName, c, m, step, tw)
//
// if err != nil {
// level.Error(logger).Log("msg", "CurlTlsMetricsResError", "func_name", funcName, "err:", err, "seq", fmt.Sprintf("%d/%d", seq+1, len(metricUrlMap)), "addr", c.Addr)
// //return
// }
// if len(ms) == 0 {
// level.Error(logger).Log("msg", "CurlTlsMetricsResEmpty", "func_name", funcName, "seq", fmt.Sprintf("%d/%d", seq+1, len(metricUrlMap)), "addr", c.Addr)
// //return
// }
// metricList = append(metricList, ms...)
// seq += 1
// }
// return
//}

func sum64(hash [md5.Size]byte) uint64 {
var s uint64

for i, b := range hash {
shift := uint64((md5.Size - i - 1) * 8)

s |= uint64(b) << shift
}
return s
}

func ConcurrencyCurlMetricsByIpsSetNid(cg *config.CommonApiServerConfig, logger log.Logger, dataMap *HistoryMap, funcName string, appendTags map[string]string, step int64, tw int64, multiServerInstanceUniqueLabel string, multiFuncUniqueLabel string, serverSideNid string, pushServerAddr string) {
metricUrlMap := GetServerSideAddr(cg, logger, dataMap, funcName)
if len(metricUrlMap) == 0 {
level.Error(logger).Log("msg", "GetServerSideAddrEmpty", "funcName:", funcName)
return
}
seq := 0
if cg.ConcurrencyLimit == 0 {
cg.ConcurrencyLimit = 10
}
controlChan := make(chan int, cg.ConcurrencyLimit)

for uniqueHost, murl := range metricUrlMap {
if cg.HashModNum == 0 && cg.HashModShard == 0 {
goto collect
} else {
mod := sum64(md5.Sum([]byte(murl))) % cg.HashModNum
level.Debug(logger).Log("msg", "metricsHashModEnabled",
"funcName:", funcName,
"cg.HashModNum:", cg.HashModNum,
"cg.HashModShard:", cg.HashModShard,
"mod:", mod,
)

if mod != cg.HashModShard {
continue
}
goto collect
}

level.Info(logger).Log("msg", "GetServerAddrByGetPod", "funcName", funcName, "num", len(ips))
collect:
controlChan <- 1

for index, addr := range ips {
tmp := *cg
c := &tmp
c.Addr = fmt.Sprintf("%s://%s:%d/%s", c.Scheme, addr, c.Port, c.MetricsPath)
c.Addr = murl
// 添加service_addr tag
m := map[string]string{
multiServerInstanceUniqueLabel: addr,
multiServerInstanceUniqueLabel: uniqueHost,
multiFuncUniqueLabel: funcName,
}
for k, v := range appendTags {
m[k] = v
}

ms, err := CurlTlsMetricsApi(logger, funcName, c, m, step, tw)

if err != nil {
level.Error(logger).Log("msg", "CurlTlsMetricsResError", "func_name", funcName, "err:", err, "seq", fmt.Sprintf("%d/%d", index+1, len(ips)), "addr", c.Addr)
//return
}
if len(ms) == 0 {
level.Error(logger).Log("msg", "CurlTlsMetricsResEmpty", "func_name", funcName, "seq", fmt.Sprintf("%d/%d", index+1, len(ips)), "addr", c.Addr)
//return
}
metricList = append(metricList, ms...)
go AsyncCurlMetricsAndPush(controlChan, c, logger, funcName, m, step, tw, seq+1, len(metricUrlMap), serverSideNid, pushServerAddr)
seq += 1
}
return
}
Expand Down
10 changes: 1 addition & 9 deletions collect/const.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
package collect

const (
FUNCNAME_APISERVER = "api-server"
FUNCNAME_KUBESCHEDULER = "kube-scheduler"
FUNCNAME_KUBECONTROLLER = "kube-controller-manager"
FUNCNAME_COREDNS = "coredns"
FUNCNAME_KUBELET = "kubelet"
FUNCNAME_KUBESTATSMETRICS = "kube-stats-metrics"
APPENDTAG_SERVER_ADDR = "server_addr"
)
const ()
19 changes: 1 addition & 18 deletions collect/coredns.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,12 @@
package collect

import (
"github.com/didi/nightingale/src/common/dataobj"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/n9e/k8s-mon/config"
"time"
)

func DoKubeCoreDnsCollect(cg *config.Config, logger log.Logger, dataMap *HistoryMap, funcName string) {
start := time.Now()
metricList := CurlMetricsByIps(cg.CoreDnsC, logger, dataMap, funcName, cg.AppendTags, cg.Step, cg.TimeOutSeconds, cg.MultiServerInstanceUniqueLabel)

if len(metricList) == 0 {
level.Error(logger).Log("msg", "FinallyCurlTlsMetricsResEmpty", "func_name", funcName)
return
}
ConcurrencyCurlMetricsByIpsSetNid(cg.CoreDnsC, logger, dataMap, funcName, cg.AppendTags, cg.Step, cg.TimeOutSeconds, cg.MultiServerInstanceUniqueLabel, cg.MultiFuncUniqueLabel, cg.ServerSideNid, cg.PushServerAddr)

ml := make([]dataobj.MetricValue, 0)
for _, m := range metricList {
m.Nid = cg.ServerSideNid
ml = append(ml, m)

}
level.Info(logger).Log("msg", "DoCollectSuccessfullyReadyToPush", "funcName", funcName, "metrics_num", len(ml), "time_took_seconds", time.Since(start).Seconds())

go PushWork(cg.PushServerAddr, cg.TimeOutSeconds, ml, logger, funcName)
}
58 changes: 58 additions & 0 deletions collect/get_node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package collect

import (
"context"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
kconfig "github.com/n9e/k8s-mon/config"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"time"
)

func GetServerAddrByGetNode(logger log.Logger, dataMap *HistoryMap) {
start := time.Now()
// creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
level.Error(logger).Log("msg", "create_k8s_InClusterConfig_error", "err:", err)
return
}
// creates the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
level.Error(logger).Log("msg", "creates_the_clientset_error", "err:", err)
return
}
nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
level.Error(logger).Log("msg", "list_kube-system_pod_error", "err:", err)
return
}

nodeIps := make([]string, 0)
if len(nodes.Items) == 0 {
return
}
for _, p := range nodes.Items {
addr := p.Status.Addresses
if len(addr) == 0 {
continue
}
for _, a := range addr {
if a.Type == apiv1.NodeInternalIP {
nodeIps = append(nodeIps, a.Address)
}
}
}
level.Info(logger).Log("msg", "server_node_ips_result",
"num_nodeIps", len(nodeIps),
"time_took_seconds", time.Since(start).Seconds(),
)
if len(nodeIps) > 0 {
dataMap.Map.Store(kconfig.FUNCNAME_KUBELET_NODE, nodeIps)
}

}
Loading

0 comments on commit 4254f3e

Please sign in to comment.