Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add getting cluster collection stats api #27

Merged
merged 4 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions core/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
package core

import (
"context"
"errors"
"fmt"
cerr "infini.sh/console/core/errors"
"infini.sh/console/core/security"
"infini.sh/framework/core/api"
httprouter "infini.sh/framework/core/api/router"
Expand All @@ -37,6 +41,18 @@ type Handler struct {
api.Handler
}

func (handler Handler) WriteError(w http.ResponseWriter, err interface{}, status int) {
if v, ok := err.(error); ok {
if errors.Is(v, context.DeadlineExceeded) {
handler.Handler.WriteError(w, cerr.New(cerr.ErrTypeRequestTimeout, "", err).Error(), status)
return
}
handler.Handler.WriteError(w, v.Error(), status)
return
}
handler.Handler.WriteError(w, fmt.Sprintf("%v", err), status)
}

func (handler Handler) RequireLogin(h httprouter.Handle) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {

Expand Down
83 changes: 83 additions & 0 deletions core/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (C) INFINI Labs & INFINI LIMITED.
//
// The INFINI Console is offered under the GNU Affero General Public License v3.0
// and as commercial software.
//
// For commercial licensing, contact us at:
// - Website: infinilabs.com
// - Email: [email protected]
//
// Open Source licensed under AGPL V3:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package errors

import (
"fmt"
"infini.sh/framework/core/errors"
)

const (
ErrTypeRequestParams = "request_params_error"
ErrTypeApplication = "application_error"
ErrTypeAlreadyExists = "already_exists_error"
ErrTypeNotExists = "not_exists_error"
ErrTypeIncorrectPassword = "incorrect_password_error"
ErrTypeDomainPrefixMismatch = "domain_prefix_mismatch_error"
ErrTypeDisabled = "disabled_error"
ErrTypeRequestTimeout = "request_timeout_error"
)

var (
ErrPasswordIncorrect = errors.New("incorrect password")
ErrNotExistsErr = errors.New("not exists")
)

type Error struct {
typ string
msg interface{}
field string
}

func (err Error) Error() string {
return fmt.Sprintf("%s:%v: %v", err.typ, err.field, err.msg)
}

//NewAppError returns an application error
func NewAppError(msg any) *Error {
return New(ErrTypeApplication, "", msg)
}

//NewParamsError returns a request params error
func NewParamsError(field string, msg any) *Error {
return New(ErrTypeRequestParams, field, msg)
}

//NewAlreadyExistsError returns an already exists error
func NewAlreadyExistsError(field string, msg any) *Error {
return New(ErrTypeAlreadyExists, field, msg)
}

//NewNotExistsError returns a not exists error
func NewNotExistsError(field string, msg any) *Error {
return New(ErrTypeNotExists, field, msg)
}

func New(typ string, field string, msg any) *Error {
return &Error{
typ,
msg,
field,
}
}
99 changes: 96 additions & 3 deletions modules/elastic/api/cluster_overview.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ package api

import (
"context"
"errors"
"fmt"
cerr "infini.sh/console/core/errors"
"infini.sh/framework/modules/elastic/adapter"
"net/http"
"strings"
Expand Down Expand Up @@ -263,7 +265,16 @@ func (h *APIHandler) FetchClusterInfo(w http.ResponseWriter, req *http.Request,
}
ctx, cancel := context.WithTimeout(context.Background(), du)
defer cancel()
indexMetrics := h.getMetrics(ctx, query, indexMetricItems, bucketSize)
indexMetrics, err := h.getMetrics(ctx, query, indexMetricItems, bucketSize)
if err != nil {
log.Error(err)
if errors.Is(err, context.DeadlineExceeded) {
h.WriteError(w, cerr.New(cerr.ErrTypeRequestTimeout, "", err).Error(), http.StatusRequestTimeout)
return
}
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
indexingMetricData := util.MapStr{}
for _, line := range indexMetrics["cluster_indexing"].Lines {
// remove first metric dot
Expand Down Expand Up @@ -738,7 +749,7 @@ func (h *APIHandler) GetRealtimeClusterNodes(w http.ResponseWriter, req *http.Re

func (h *APIHandler) GetClusterIndices(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.ByName("id")
if GetMonitorState(id) == Console {
if GetMonitorState(id) == elastic.ModeAgentless {
h.APIHandler.GetClusterIndices(w, req, ps)
return
}
Expand Down Expand Up @@ -774,7 +785,7 @@ func (h *APIHandler) GetClusterIndices(w http.ResponseWriter, req *http.Request,
func (h *APIHandler) GetRealtimeClusterIndices(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
resBody := map[string]interface{}{}
id := ps.ByName("id")
if GetMonitorState(id) == Console {
if GetMonitorState(id) == elastic.ModeAgentless {
h.APIHandler.GetRealtimeClusterIndices(w, req, ps)
return
}
Expand Down Expand Up @@ -1327,3 +1338,85 @@ func (h *APIHandler) SearchClusterMetadata(w http.ResponseWriter, req *http.Requ
}
w.Write(util.MustToJSONBytes(response))
}

func (h *APIHandler) getClusterMonitorState(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.ByName("id")
collectionMode := GetMonitorState(id)
ret := util.MapStr{
"cluster_id": id,
"metric_collection_mode": collectionMode,
}
queryDSL := util.MapStr{
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"metadata.labels.cluster_id": id,
},
},
{
"term": util.MapStr{
"metadata.category": "elasticsearch",
},
},
},
},
},
"size": 0,
"aggs": util.MapStr{
"grp_name": util.MapStr{
"terms": util.MapStr{
"field": "metadata.name",
"size": 10,
},
"aggs": util.MapStr{
"max_timestamp": util.MapStr{
"max": util.MapStr{
"field": "timestamp",
},
},
},
},
},
}
dsl := util.MustToJSONBytes(queryDSL)
response, err := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).SearchWithRawQueryDSL(getAllMetricsIndex(), dsl)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
for _, bk := range response.Aggregations["grp_name"].Buckets {
key := bk["key"].(string)
if tv, ok := bk["max_timestamp"].(map[string]interface{}); ok {
if collectionMode == elastic.ModeAgentless {
if util.StringInArray([]string{ "index_stats", "cluster_health", "cluster_stats", "node_stats"}, key) {
ret[key] = getCollectionStats(tv["value"])
}
}else{
if util.StringInArray([]string{ "shard_stats", "cluster_health", "cluster_stats", "node_stats"}, key) {
ret[key] = getCollectionStats(tv["value"])
}
}
}

}
h.WriteJSON(w, ret, http.StatusOK)
}

func getCollectionStats(lastActiveAt interface{}) util.MapStr {
stats := util.MapStr{
"last_active_at": lastActiveAt,
"status": "active",
}
if timestamp, ok := lastActiveAt.(float64); ok {
t := time.Unix(int64(timestamp/1000), 0)
if time.Now().Sub(t) > 5 * time.Minute {
stats["status"] = "warning"
}else{
stats["status"] = "ok"
}
}
return stats
}
44 changes: 32 additions & 12 deletions modules/elastic/api/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,12 @@ func (h *APIHandler) FetchHostInfo(w http.ResponseWriter, req *http.Request, ps
Units: "/s",
},
}
hostMetrics := h.getGroupHostMetric(context.Background(), agentIDs, min, max, bucketSize, hostMetricItems, "agent.id")
hostMetrics, err := h.getGroupHostMetric(context.Background(), agentIDs, min, max, bucketSize, hostMetricItems, "agent.id")
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}

networkMetrics := map[string]util.MapStr{}
for key, item := range hostMetrics {
Expand Down Expand Up @@ -572,7 +577,7 @@ func (h *APIHandler) GetHostInfo(w http.ResponseWriter, req *http.Request, ps ht

}

func (h *APIHandler) getSingleHostMetric(ctx context.Context, agentID string, min, max int64, bucketSize int, metricItems []*common.MetricItem) map[string]*common.MetricItem {
func (h *APIHandler) getSingleHostMetric(ctx context.Context, agentID string, min, max int64, bucketSize int, metricItems []*common.MetricItem) (map[string]*common.MetricItem, error) {
var must = []util.MapStr{
{
"term": util.MapStr{
Expand Down Expand Up @@ -608,7 +613,7 @@ func (h *APIHandler) getSingleHostMetric(ctx context.Context, agentID string, mi
return h.getSingleMetrics(ctx, metricItems, query, bucketSize)
}

func (h *APIHandler) getSingleHostMetricFromNode(ctx context.Context, nodeID string, min, max int64, bucketSize int, metricKey string) map[string]*common.MetricItem {
func (h *APIHandler) getSingleHostMetricFromNode(ctx context.Context, nodeID string, min, max int64, bucketSize int, metricKey string) (map[string]*common.MetricItem, error) {
var must = []util.MapStr{
{
"term": util.MapStr{
Expand Down Expand Up @@ -725,7 +730,12 @@ func (h *APIHandler) GetSingleHostMetrics(w http.ResponseWriter, req *http.Reque
ctx, cancel := context.WithTimeout(context.Background(), du)
defer cancel()
if hostInfo.AgentID == "" {
resBody["metrics"] = h.getSingleHostMetricFromNode(ctx, hostInfo.NodeID, min, max, bucketSize, key)
resBody["metrics"], err = h.getSingleHostMetricFromNode(ctx, hostInfo.NodeID, min, max, bucketSize, key)
if err != nil {
log.Error(err)
h.WriteError(w, err, http.StatusInternalServerError)
return
}
h.WriteJSON(w, resBody, http.StatusOK)
return
}
Expand Down Expand Up @@ -788,20 +798,30 @@ func (h *APIHandler) GetSingleHostMetrics(w http.ResponseWriter, req *http.Reque
metricItem.AddLine("Disk Write Rate", "Disk Write Rate", "network write rate of host.", "group1", "payload.host.diskio_summary.write.bytes", "max", bucketSizeStr, "%", "bytes", "0,0.[00]", "0,0.[00]", false, true)
metricItems = append(metricItems, metricItem)
case DiskPartitionUsageMetricKey, NetworkInterfaceOutputRateMetricKey:
groupMetrics := h.getGroupHostMetrics(ctx, hostInfo.AgentID, min, max, bucketSize, key)
resBody["metrics"] = groupMetrics
resBody["metrics"] , err = h.getGroupHostMetrics(ctx, hostInfo.AgentID, min, max, bucketSize, key)
if err != nil {
log.Error(err)
h.WriteError(w, err, http.StatusInternalServerError)
return
}
h.WriteJSON(w, resBody, http.StatusOK)
return
}
hostMetrics := h.getSingleHostMetric(ctx, hostInfo.AgentID, min, max, bucketSize, metricItems)
hostMetrics, err := h.getSingleHostMetric(ctx, hostInfo.AgentID, min, max, bucketSize, metricItems)
if err != nil {
log.Error(err)
h.WriteError(w, err, http.StatusInternalServerError)
return
}

resBody["metrics"] = hostMetrics

h.WriteJSON(w, resBody, http.StatusOK)
}

func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, min, max int64, bucketSize int, metricKey string) map[string]*common.MetricItem {
func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, min, max int64, bucketSize int, metricKey string) (map[string]*common.MetricItem, error) {
var metrics = make(map[string]*common.MetricItem)
var err error
switch metricKey {
case DiskPartitionUsageMetricKey:
diskPartitionMetric := newMetricItem(DiskPartitionUsageMetricKey, 2, SystemGroupKey)
Expand All @@ -817,7 +837,7 @@ func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, mi
Units: "%",
},
}
metrics = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.disk_partition_usage.partition")
metrics, err = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.disk_partition_usage.partition")
case NetworkInterfaceOutputRateMetricKey:
networkOutputMetric := newMetricItem(NetworkInterfaceOutputRateMetricKey, 2, SystemGroupKey)
networkOutputMetric.AddAxi("Network interface output rate", "group1", common.PositionLeft, "bytes", "0.[0]", "0.[0]", 5, true)
Expand All @@ -832,13 +852,13 @@ func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, mi
Units: "",
},
}
metrics = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.network_interface.name")
metrics, err = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.network_interface.name")
}

return metrics
return metrics, err
}

func (h *APIHandler) getGroupHostMetric(ctx context.Context, agentIDs []string, min, max int64, bucketSize int, hostMetricItems []GroupMetricItem, groupField string) map[string]*common.MetricItem {
func (h *APIHandler) getGroupHostMetric(ctx context.Context, agentIDs []string, min, max int64, bucketSize int, hostMetricItems []GroupMetricItem, groupField string) (map[string]*common.MetricItem, error) {
var must = []util.MapStr{
{
"term": util.MapStr{
Expand Down
2 changes: 1 addition & 1 deletion modules/elastic/api/index_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu
},
},
}
return h.getMetrics(ctx, query, indexMetricItems, bucketSize), nil
return h.getMetrics(ctx, query, indexMetricItems, bucketSize)

}

Expand Down
Loading
Loading