Skip to content

Commit

Permalink
feat: add getting cluster collection stats api (#27)
Browse files Browse the repository at this point in the history
* fix: empty index metrics in overview info api when collecting mode is agent

* feat: add getting cluster monitor state api

* chore: updating monitor state api

* chore: custom request timeout error with quering metrics
  • Loading branch information
silenceqi authored Dec 13, 2024
1 parent 020c8aa commit 5a02966
Show file tree
Hide file tree
Showing 18 changed files with 390 additions and 117 deletions.
16 changes: 16 additions & 0 deletions core/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
package core

import (
"context"
"errors"
"fmt"
cerr "infini.sh/console/core/errors"
"infini.sh/console/core/security"
"infini.sh/framework/core/api"
httprouter "infini.sh/framework/core/api/router"
Expand All @@ -37,6 +41,18 @@ type Handler struct {
api.Handler
}

func (handler Handler) WriteError(w http.ResponseWriter, err interface{}, status int) {
if v, ok := err.(error); ok {
if errors.Is(v, context.DeadlineExceeded) {
handler.Handler.WriteError(w, cerr.New(cerr.ErrTypeRequestTimeout, "", err).Error(), status)
return
}
handler.Handler.WriteError(w, v.Error(), status)
return
}
handler.Handler.WriteError(w, fmt.Sprintf("%v", err), status)
}

func (handler Handler) RequireLogin(h httprouter.Handle) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {

Expand Down
83 changes: 83 additions & 0 deletions core/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (C) INFINI Labs & INFINI LIMITED.
//
// The INFINI Console is offered under the GNU Affero General Public License v3.0
// and as commercial software.
//
// For commercial licensing, contact us at:
// - Website: infinilabs.com
// - Email: [email protected]
//
// Open Source licensed under AGPL V3:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package errors

import (
"fmt"
"infini.sh/framework/core/errors"
)

const (
ErrTypeRequestParams = "request_params_error"
ErrTypeApplication = "application_error"
ErrTypeAlreadyExists = "already_exists_error"
ErrTypeNotExists = "not_exists_error"
ErrTypeIncorrectPassword = "incorrect_password_error"
ErrTypeDomainPrefixMismatch = "domain_prefix_mismatch_error"
ErrTypeDisabled = "disabled_error"
ErrTypeRequestTimeout = "request_timeout_error"
)

var (
ErrPasswordIncorrect = errors.New("incorrect password")
ErrNotExistsErr = errors.New("not exists")
)

type Error struct {
typ string
msg interface{}
field string
}

func (err Error) Error() string {
return fmt.Sprintf("%s:%v: %v", err.typ, err.field, err.msg)
}

//NewAppError returns an application error
func NewAppError(msg any) *Error {
return New(ErrTypeApplication, "", msg)
}

//NewParamsError returns a request params error
func NewParamsError(field string, msg any) *Error {
return New(ErrTypeRequestParams, field, msg)
}

//NewAlreadyExistsError returns an already exists error
func NewAlreadyExistsError(field string, msg any) *Error {
return New(ErrTypeAlreadyExists, field, msg)
}

//NewNotExistsError returns a not exists error
func NewNotExistsError(field string, msg any) *Error {
return New(ErrTypeNotExists, field, msg)
}

func New(typ string, field string, msg any) *Error {
return &Error{
typ,
msg,
field,
}
}
99 changes: 96 additions & 3 deletions modules/elastic/api/cluster_overview.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ package api

import (
"context"
"errors"
"fmt"
cerr "infini.sh/console/core/errors"
"infini.sh/framework/modules/elastic/adapter"
"net/http"
"strings"
Expand Down Expand Up @@ -263,7 +265,16 @@ func (h *APIHandler) FetchClusterInfo(w http.ResponseWriter, req *http.Request,
}
ctx, cancel := context.WithTimeout(context.Background(), du)
defer cancel()
indexMetrics := h.getMetrics(ctx, query, indexMetricItems, bucketSize)
indexMetrics, err := h.getMetrics(ctx, query, indexMetricItems, bucketSize)
if err != nil {
log.Error(err)
if errors.Is(err, context.DeadlineExceeded) {
h.WriteError(w, cerr.New(cerr.ErrTypeRequestTimeout, "", err).Error(), http.StatusRequestTimeout)
return
}
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
indexingMetricData := util.MapStr{}
for _, line := range indexMetrics["cluster_indexing"].Lines {
// remove first metric dot
Expand Down Expand Up @@ -738,7 +749,7 @@ func (h *APIHandler) GetRealtimeClusterNodes(w http.ResponseWriter, req *http.Re

func (h *APIHandler) GetClusterIndices(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.ByName("id")
if GetMonitorState(id) == Console {
if GetMonitorState(id) == elastic.ModeAgentless {
h.APIHandler.GetClusterIndices(w, req, ps)
return
}
Expand Down Expand Up @@ -774,7 +785,7 @@ func (h *APIHandler) GetClusterIndices(w http.ResponseWriter, req *http.Request,
func (h *APIHandler) GetRealtimeClusterIndices(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
resBody := map[string]interface{}{}
id := ps.ByName("id")
if GetMonitorState(id) == Console {
if GetMonitorState(id) == elastic.ModeAgentless {
h.APIHandler.GetRealtimeClusterIndices(w, req, ps)
return
}
Expand Down Expand Up @@ -1327,3 +1338,85 @@ func (h *APIHandler) SearchClusterMetadata(w http.ResponseWriter, req *http.Requ
}
w.Write(util.MustToJSONBytes(response))
}

func (h *APIHandler) getClusterMonitorState(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.ByName("id")
collectionMode := GetMonitorState(id)
ret := util.MapStr{
"cluster_id": id,
"metric_collection_mode": collectionMode,
}
queryDSL := util.MapStr{
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"metadata.labels.cluster_id": id,
},
},
{
"term": util.MapStr{
"metadata.category": "elasticsearch",
},
},
},
},
},
"size": 0,
"aggs": util.MapStr{
"grp_name": util.MapStr{
"terms": util.MapStr{
"field": "metadata.name",
"size": 10,
},
"aggs": util.MapStr{
"max_timestamp": util.MapStr{
"max": util.MapStr{
"field": "timestamp",
},
},
},
},
},
}
dsl := util.MustToJSONBytes(queryDSL)
response, err := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).SearchWithRawQueryDSL(getAllMetricsIndex(), dsl)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
for _, bk := range response.Aggregations["grp_name"].Buckets {
key := bk["key"].(string)
if tv, ok := bk["max_timestamp"].(map[string]interface{}); ok {
if collectionMode == elastic.ModeAgentless {
if util.StringInArray([]string{ "index_stats", "cluster_health", "cluster_stats", "node_stats"}, key) {
ret[key] = getCollectionStats(tv["value"])
}
}else{
if util.StringInArray([]string{ "shard_stats", "cluster_health", "cluster_stats", "node_stats"}, key) {
ret[key] = getCollectionStats(tv["value"])
}
}
}

}
h.WriteJSON(w, ret, http.StatusOK)
}

func getCollectionStats(lastActiveAt interface{}) util.MapStr {
stats := util.MapStr{
"last_active_at": lastActiveAt,
"status": "active",
}
if timestamp, ok := lastActiveAt.(float64); ok {
t := time.Unix(int64(timestamp/1000), 0)
if time.Now().Sub(t) > 5 * time.Minute {
stats["status"] = "warning"
}else{
stats["status"] = "ok"
}
}
return stats
}
44 changes: 32 additions & 12 deletions modules/elastic/api/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,12 @@ func (h *APIHandler) FetchHostInfo(w http.ResponseWriter, req *http.Request, ps
Units: "/s",
},
}
hostMetrics := h.getGroupHostMetric(context.Background(), agentIDs, min, max, bucketSize, hostMetricItems, "agent.id")
hostMetrics, err := h.getGroupHostMetric(context.Background(), agentIDs, min, max, bucketSize, hostMetricItems, "agent.id")
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}

networkMetrics := map[string]util.MapStr{}
for key, item := range hostMetrics {
Expand Down Expand Up @@ -572,7 +577,7 @@ func (h *APIHandler) GetHostInfo(w http.ResponseWriter, req *http.Request, ps ht

}

func (h *APIHandler) getSingleHostMetric(ctx context.Context, agentID string, min, max int64, bucketSize int, metricItems []*common.MetricItem) map[string]*common.MetricItem {
func (h *APIHandler) getSingleHostMetric(ctx context.Context, agentID string, min, max int64, bucketSize int, metricItems []*common.MetricItem) (map[string]*common.MetricItem, error) {
var must = []util.MapStr{
{
"term": util.MapStr{
Expand Down Expand Up @@ -608,7 +613,7 @@ func (h *APIHandler) getSingleHostMetric(ctx context.Context, agentID string, mi
return h.getSingleMetrics(ctx, metricItems, query, bucketSize)
}

func (h *APIHandler) getSingleHostMetricFromNode(ctx context.Context, nodeID string, min, max int64, bucketSize int, metricKey string) map[string]*common.MetricItem {
func (h *APIHandler) getSingleHostMetricFromNode(ctx context.Context, nodeID string, min, max int64, bucketSize int, metricKey string) (map[string]*common.MetricItem, error) {
var must = []util.MapStr{
{
"term": util.MapStr{
Expand Down Expand Up @@ -725,7 +730,12 @@ func (h *APIHandler) GetSingleHostMetrics(w http.ResponseWriter, req *http.Reque
ctx, cancel := context.WithTimeout(context.Background(), du)
defer cancel()
if hostInfo.AgentID == "" {
resBody["metrics"] = h.getSingleHostMetricFromNode(ctx, hostInfo.NodeID, min, max, bucketSize, key)
resBody["metrics"], err = h.getSingleHostMetricFromNode(ctx, hostInfo.NodeID, min, max, bucketSize, key)
if err != nil {
log.Error(err)
h.WriteError(w, err, http.StatusInternalServerError)
return
}
h.WriteJSON(w, resBody, http.StatusOK)
return
}
Expand Down Expand Up @@ -788,20 +798,30 @@ func (h *APIHandler) GetSingleHostMetrics(w http.ResponseWriter, req *http.Reque
metricItem.AddLine("Disk Write Rate", "Disk Write Rate", "network write rate of host.", "group1", "payload.host.diskio_summary.write.bytes", "max", bucketSizeStr, "%", "bytes", "0,0.[00]", "0,0.[00]", false, true)
metricItems = append(metricItems, metricItem)
case DiskPartitionUsageMetricKey, NetworkInterfaceOutputRateMetricKey:
groupMetrics := h.getGroupHostMetrics(ctx, hostInfo.AgentID, min, max, bucketSize, key)
resBody["metrics"] = groupMetrics
resBody["metrics"] , err = h.getGroupHostMetrics(ctx, hostInfo.AgentID, min, max, bucketSize, key)
if err != nil {
log.Error(err)
h.WriteError(w, err, http.StatusInternalServerError)
return
}
h.WriteJSON(w, resBody, http.StatusOK)
return
}
hostMetrics := h.getSingleHostMetric(ctx, hostInfo.AgentID, min, max, bucketSize, metricItems)
hostMetrics, err := h.getSingleHostMetric(ctx, hostInfo.AgentID, min, max, bucketSize, metricItems)
if err != nil {
log.Error(err)
h.WriteError(w, err, http.StatusInternalServerError)
return
}

resBody["metrics"] = hostMetrics

h.WriteJSON(w, resBody, http.StatusOK)
}

func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, min, max int64, bucketSize int, metricKey string) map[string]*common.MetricItem {
func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, min, max int64, bucketSize int, metricKey string) (map[string]*common.MetricItem, error) {
var metrics = make(map[string]*common.MetricItem)
var err error
switch metricKey {
case DiskPartitionUsageMetricKey:
diskPartitionMetric := newMetricItem(DiskPartitionUsageMetricKey, 2, SystemGroupKey)
Expand All @@ -817,7 +837,7 @@ func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, mi
Units: "%",
},
}
metrics = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.disk_partition_usage.partition")
metrics, err = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.disk_partition_usage.partition")
case NetworkInterfaceOutputRateMetricKey:
networkOutputMetric := newMetricItem(NetworkInterfaceOutputRateMetricKey, 2, SystemGroupKey)
networkOutputMetric.AddAxi("Network interface output rate", "group1", common.PositionLeft, "bytes", "0.[0]", "0.[0]", 5, true)
Expand All @@ -832,13 +852,13 @@ func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, mi
Units: "",
},
}
metrics = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.network_interface.name")
metrics, err = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.network_interface.name")
}

return metrics
return metrics, err
}

func (h *APIHandler) getGroupHostMetric(ctx context.Context, agentIDs []string, min, max int64, bucketSize int, hostMetricItems []GroupMetricItem, groupField string) map[string]*common.MetricItem {
func (h *APIHandler) getGroupHostMetric(ctx context.Context, agentIDs []string, min, max int64, bucketSize int, hostMetricItems []GroupMetricItem, groupField string) (map[string]*common.MetricItem, error) {
var must = []util.MapStr{
{
"term": util.MapStr{
Expand Down
2 changes: 1 addition & 1 deletion modules/elastic/api/index_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu
},
},
}
return h.getMetrics(ctx, query, indexMetricItems, bucketSize), nil
return h.getMetrics(ctx, query, indexMetricItems, bucketSize)

}

Expand Down
Loading

0 comments on commit 5a02966

Please sign in to comment.