From 37c42dff7e86db5c92bbb7051294d5ed17141b7d Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 12 Dec 2024 14:29:40 +0800 Subject: [PATCH 1/4] fix: empty index metrics in overview info api when collecting mode is agent --- modules/elastic/api/index_overview.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/modules/elastic/api/index_overview.go b/modules/elastic/api/index_overview.go index fc8c703b..56f91684 100644 --- a/modules/elastic/api/index_overview.go +++ b/modules/elastic/api/index_overview.go @@ -439,7 +439,12 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p }, { "term": util.MapStr{ - "metadata.labels.index_id": newIndexIDs[0], + "metadata.labels.cluster_id": firstClusterID, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.index_name": firstIndexName, }, }, }, From 7c9badcf96329cf80fd97977d6dca0c017faddb0 Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 12 Dec 2024 17:31:20 +0800 Subject: [PATCH 2/4] feat: add getting cluster monitor state api --- modules/elastic/api/cluster_overview.go | 72 ++++++++++++++++++++++++- modules/elastic/api/index_overview.go | 8 +-- modules/elastic/api/init.go | 1 + modules/elastic/api/manage.go | 4 +- modules/elastic/api/monitor_state.go | 11 ++-- modules/elastic/api/node_overview.go | 2 +- 6 files changed, 81 insertions(+), 17 deletions(-) diff --git a/modules/elastic/api/cluster_overview.go b/modules/elastic/api/cluster_overview.go index 57277a82..cdb42354 100644 --- a/modules/elastic/api/cluster_overview.go +++ b/modules/elastic/api/cluster_overview.go @@ -738,7 +738,7 @@ func (h *APIHandler) GetRealtimeClusterNodes(w http.ResponseWriter, req *http.Re func (h *APIHandler) GetClusterIndices(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.ByName("id") - if GetMonitorState(id) == Console { + if GetMonitorState(id) == elastic.ModeAgentless { h.APIHandler.GetClusterIndices(w, req, ps) return } @@ -774,7 +774,7 @@ func (h *APIHandler) GetClusterIndices(w http.ResponseWriter, req *http.Request, func (h *APIHandler) GetRealtimeClusterIndices(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { resBody := map[string]interface{}{} id := ps.ByName("id") - if GetMonitorState(id) == Console { + if GetMonitorState(id) == elastic.ModeAgentless { h.APIHandler.GetRealtimeClusterIndices(w, req, ps) return } @@ -1327,3 +1327,71 @@ func (h *APIHandler) SearchClusterMetadata(w http.ResponseWriter, req *http.Requ } w.Write(util.MustToJSONBytes(response)) } + +func (h *APIHandler) getClusterMonitorState(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.ByName("id") + collectionMode := GetMonitorState(id) + ret := util.MapStr{ + "cluster_id": id, + "metric_collection_mode": collectionMode, + } + queryDSL := util.MapStr{ + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.cluster_id": id, + }, + }, + { + "term": util.MapStr{ + "metadata.category": "elasticsearch", + }, + }, + }, + }, + }, + "size": 0, + "aggs": util.MapStr{ + "grp_name": util.MapStr{ + "terms": util.MapStr{ + "field": "metadata.name", + "size": 10, + }, + "aggs": util.MapStr{ + "max_timestamp": util.MapStr{ + "max": util.MapStr{ + "field": "timestamp", + }, + }, + }, + }, + }, + } + dsl := util.MustToJSONBytes(queryDSL) + response, err := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).SearchWithRawQueryDSL(getAllMetricsIndex(), dsl) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + lastActiveAt := util.MapStr{} + for _, bk := range response.Aggregations["grp_name"].Buckets { + key := bk["key"].(string) + if tv, ok := bk["max_timestamp"].(map[string]interface{}); ok { + if collectionMode == elastic.ModeAgentless { + if util.StringInArray([]string{ "index_stats", "cluster_health", "cluster_stats", "node_stats"}, key) { + lastActiveAt[key] = tv["value"] + } + }else{ + if util.StringInArray([]string{ "shard_stats", "cluster_health", "cluster_stats", "node_stats"}, key) { + lastActiveAt[key] = tv["value"] + } + } + } + + } + ret["last_active_at"] = lastActiveAt + h.WriteJSON(w, ret, http.StatusOK) +} \ No newline at end of file diff --git a/modules/elastic/api/index_overview.go b/modules/elastic/api/index_overview.go index 56f91684..203628f2 100644 --- a/modules/elastic/api/index_overview.go +++ b/modules/elastic/api/index_overview.go @@ -303,7 +303,7 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p return } firstClusterID, firstIndexName = parts[0], parts[1] - if GetMonitorState(firstClusterID) == Console { + if GetMonitorState(firstClusterID) == elastic.ModeAgentless { h.APIHandler.FetchIndexInfo(w, ctx, indexIDs) return } @@ -580,7 +580,7 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p func (h *APIHandler) GetIndexInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { clusterID := ps.MustGetParameter("id") - if GetMonitorState(clusterID) == Console { + if GetMonitorState(clusterID) == elastic.ModeAgentless { h.APIHandler.GetIndexInfo(w, req, ps) return } @@ -701,7 +701,7 @@ func (h *APIHandler) GetIndexInfo(w http.ResponseWriter, req *http.Request, ps h func (h *APIHandler) GetIndexShards(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { clusterID := ps.MustGetParameter("id") - if GetMonitorState(clusterID) == Console { + if GetMonitorState(clusterID) == elastic.ModeAgentless { h.APIHandler.GetIndexShards(w, req, ps) return } @@ -810,7 +810,7 @@ func (h *APIHandler) GetIndexShards(w http.ResponseWriter, req *http.Request, ps func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { clusterID := ps.MustGetParameter("id") - if GetMonitorState(clusterID) == Console { + if GetMonitorState(clusterID) == elastic.ModeAgentless { h.APIHandler.GetSingleIndexMetrics(w, req, ps) return } diff --git a/modules/elastic/api/init.go b/modules/elastic/api/init.go index 58a804b2..45092bc9 100644 --- a/modules/elastic/api/init.go +++ b/modules/elastic/api/init.go @@ -86,6 +86,7 @@ func init() { api.HandleAPIMethod(api.POST, "/elasticsearch/cluster/info", clusterAPI.RequirePermission(clusterAPI.FetchClusterInfo, enum.PermissionElasticsearchMetricRead)) api.HandleAPIMethod(api.GET, "/elasticsearch/:id/info", clusterAPI.RequireClusterPermission(clusterAPI.RequirePermission(clusterAPI.GetClusterInfo, enum.PermissionElasticsearchMetricRead))) + api.HandleAPIMethod(api.GET, "/elasticsearch/:id/monitor_state", clusterAPI.RequireClusterPermission(clusterAPI.RequirePermission(clusterAPI.getClusterMonitorState, enum.PermissionElasticsearchMetricRead))) api.HandleAPIMethod(api.POST, "/elasticsearch/node/_search", clusterAPI.RequirePermission(clusterAPI.SearchNodeMetadata, enum.PermissionElasticsearchNodeRead)) api.HandleAPIMethod(api.GET, "/elasticsearch/:id/nodes", clusterAPI.RequireClusterPermission(clusterAPI.RequirePermission(clusterAPI.GetClusterNodes, enum.PermissionElasticsearchMetricRead, enum.PermissionElasticsearchNodeRead))) api.HandleAPIMethod(api.GET, "/elasticsearch/:id/nodes/realtime", clusterAPI.RequireClusterPermission(clusterAPI.RequirePermission(clusterAPI.GetRealtimeClusterNodes, enum.PermissionElasticsearchMetricRead))) diff --git a/modules/elastic/api/manage.go b/modules/elastic/api/manage.go index 4dc38a1c..dacc4148 100644 --- a/modules/elastic/api/manage.go +++ b/modules/elastic/api/manage.go @@ -528,7 +528,7 @@ func (h *APIHandler) HandleMetricsSummaryAction(w http.ResponseWriter, req *http func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { resBody := map[string]interface{}{} id := ps.ByName("id") - if GetMonitorState(id) == Console { + if GetMonitorState(id) == elastic.ModeAgentless { h.APIHandler.HandleClusterMetricsAction(w, req, ps) return } @@ -626,7 +626,7 @@ func (h *APIHandler) HandleNodeMetricsAction(w http.ResponseWriter, req *http.Re func (h *APIHandler) HandleIndexMetricsAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { resBody := map[string]interface{}{} id := ps.ByName("id") - if GetMonitorState(id) == Console { + if GetMonitorState(id) == elastic.ModeAgentless { h.APIHandler.HandleIndexMetricsAction(w, req, ps) return } diff --git a/modules/elastic/api/monitor_state.go b/modules/elastic/api/monitor_state.go index 67157818..5461c257 100644 --- a/modules/elastic/api/monitor_state.go +++ b/modules/elastic/api/monitor_state.go @@ -32,18 +32,13 @@ import ( "infini.sh/framework/core/elastic" ) -type MonitorState int -const ( - Console MonitorState = iota - Agent -) -func GetMonitorState(clusterID string) MonitorState { +func GetMonitorState(clusterID string) string { conf := elastic.GetConfig(clusterID) if conf == nil { panic(fmt.Errorf("config of cluster [%s] is not found", clusterID)) } if conf.MonitorConfigs != nil && !conf.MonitorConfigs.NodeStats.Enabled && !conf.MonitorConfigs.IndexStats.Enabled { - return Agent + return elastic.ModeAgent } - return Console + return elastic.ModeAgentless } diff --git a/modules/elastic/api/node_overview.go b/modules/elastic/api/node_overview.go index b9ec3f51..913f362d 100644 --- a/modules/elastic/api/node_overview.go +++ b/modules/elastic/api/node_overview.go @@ -1276,7 +1276,7 @@ func (h *APIHandler) getLatestIndices(req *http.Request, min string, max string, func (h *APIHandler) GetNodeShards(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { clusterID := ps.MustGetParameter("id") - if GetMonitorState(clusterID) == Console { + if GetMonitorState(clusterID) == elastic.ModeAgentless { h.APIHandler.GetNodeShards(w, req, ps) return } From a44db633761714edb378175303c86d43bdd6f0e3 Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 12 Dec 2024 21:01:52 +0800 Subject: [PATCH 3/4] chore: updating monitor state api --- modules/elastic/api/cluster_overview.go | 22 ++++++++++++++++++---- modules/elastic/api/init.go | 2 +- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/modules/elastic/api/cluster_overview.go b/modules/elastic/api/cluster_overview.go index cdb42354..87f4d4c2 100644 --- a/modules/elastic/api/cluster_overview.go +++ b/modules/elastic/api/cluster_overview.go @@ -1376,22 +1376,36 @@ func (h *APIHandler) getClusterMonitorState(w http.ResponseWriter, req *http.Req h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - lastActiveAt := util.MapStr{} for _, bk := range response.Aggregations["grp_name"].Buckets { key := bk["key"].(string) if tv, ok := bk["max_timestamp"].(map[string]interface{}); ok { if collectionMode == elastic.ModeAgentless { if util.StringInArray([]string{ "index_stats", "cluster_health", "cluster_stats", "node_stats"}, key) { - lastActiveAt[key] = tv["value"] + ret[key] = getCollectionStats(tv["value"]) } }else{ if util.StringInArray([]string{ "shard_stats", "cluster_health", "cluster_stats", "node_stats"}, key) { - lastActiveAt[key] = tv["value"] + ret[key] = getCollectionStats(tv["value"]) } } } } - ret["last_active_at"] = lastActiveAt h.WriteJSON(w, ret, http.StatusOK) +} + +func getCollectionStats(lastActiveAt interface{}) util.MapStr { + stats := util.MapStr{ + "last_active_at": lastActiveAt, + "status": "active", + } + if timestamp, ok := lastActiveAt.(float64); ok { + t := time.Unix(int64(timestamp/1000), 0) + if time.Now().Sub(t) > 5 * time.Minute { + stats["status"] = "warning" + }else{ + stats["status"] = "ok" + } + } + return stats } \ No newline at end of file diff --git a/modules/elastic/api/init.go b/modules/elastic/api/init.go index 45092bc9..661315f4 100644 --- a/modules/elastic/api/init.go +++ b/modules/elastic/api/init.go @@ -86,7 +86,7 @@ func init() { api.HandleAPIMethod(api.POST, "/elasticsearch/cluster/info", clusterAPI.RequirePermission(clusterAPI.FetchClusterInfo, enum.PermissionElasticsearchMetricRead)) api.HandleAPIMethod(api.GET, "/elasticsearch/:id/info", clusterAPI.RequireClusterPermission(clusterAPI.RequirePermission(clusterAPI.GetClusterInfo, enum.PermissionElasticsearchMetricRead))) - api.HandleAPIMethod(api.GET, "/elasticsearch/:id/monitor_state", clusterAPI.RequireClusterPermission(clusterAPI.RequirePermission(clusterAPI.getClusterMonitorState, enum.PermissionElasticsearchMetricRead))) + api.HandleAPIMethod(api.GET, "/elasticsearch/:id/_collection_stats", clusterAPI.RequireClusterPermission(clusterAPI.RequirePermission(clusterAPI.getClusterMonitorState, enum.PermissionElasticsearchMetricRead))) api.HandleAPIMethod(api.POST, "/elasticsearch/node/_search", clusterAPI.RequirePermission(clusterAPI.SearchNodeMetadata, enum.PermissionElasticsearchNodeRead)) api.HandleAPIMethod(api.GET, "/elasticsearch/:id/nodes", clusterAPI.RequireClusterPermission(clusterAPI.RequirePermission(clusterAPI.GetClusterNodes, enum.PermissionElasticsearchMetricRead, enum.PermissionElasticsearchNodeRead))) api.HandleAPIMethod(api.GET, "/elasticsearch/:id/nodes/realtime", clusterAPI.RequireClusterPermission(clusterAPI.RequirePermission(clusterAPI.GetRealtimeClusterNodes, enum.PermissionElasticsearchMetricRead))) From c6af84c0d816e69f42a640a06152bc7afddc2f7b Mon Sep 17 00:00:00 2001 From: liugq Date: Fri, 13 Dec 2024 12:03:35 +0800 Subject: [PATCH 4/4] chore: custom request timeout error with quering metrics --- core/auth.go | 16 +++++ core/errors/errors.go | 83 +++++++++++++++++++++++ modules/elastic/api/cluster_overview.go | 13 +++- modules/elastic/api/host.go | 44 ++++++++---- modules/elastic/api/index_metrics.go | 2 +- modules/elastic/api/index_overview.go | 14 +++- modules/elastic/api/manage.go | 46 +++++++------ modules/elastic/api/metrics_util.go | 37 +++++----- modules/elastic/api/node_metrics.go | 2 +- modules/elastic/api/node_overview.go | 14 +++- modules/elastic/api/threadpool_metrics.go | 2 +- modules/elastic/api/v1/index_metrics.go | 9 ++- modules/elastic/api/v1/index_overview.go | 14 +++- modules/elastic/api/v1/manage.go | 59 ++++++++++------ modules/elastic/api/v1/metrics_util.go | 17 ++--- modules/elastic/api/v1/node_overview.go | 16 ++++- 16 files changed, 289 insertions(+), 99 deletions(-) create mode 100644 core/errors/errors.go diff --git a/core/auth.go b/core/auth.go index e9804210..e88d89fe 100644 --- a/core/auth.go +++ b/core/auth.go @@ -24,6 +24,10 @@ package core import ( + "context" + "errors" + "fmt" + cerr "infini.sh/console/core/errors" "infini.sh/console/core/security" "infini.sh/framework/core/api" httprouter "infini.sh/framework/core/api/router" @@ -37,6 +41,18 @@ type Handler struct { api.Handler } +func (handler Handler) WriteError(w http.ResponseWriter, err interface{}, status int) { + if v, ok := err.(error); ok { + if errors.Is(v, context.DeadlineExceeded) { + handler.Handler.WriteError(w, cerr.New(cerr.ErrTypeRequestTimeout, "", err).Error(), status) + return + } + handler.Handler.WriteError(w, v.Error(), status) + return + } + handler.Handler.WriteError(w, fmt.Sprintf("%v", err), status) +} + func (handler Handler) RequireLogin(h httprouter.Handle) httprouter.Handle { return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { diff --git a/core/errors/errors.go b/core/errors/errors.go new file mode 100644 index 00000000..6b685152 --- /dev/null +++ b/core/errors/errors.go @@ -0,0 +1,83 @@ +// Copyright (C) INFINI Labs & INFINI LIMITED. +// +// The INFINI Console is offered under the GNU Affero General Public License v3.0 +// and as commercial software. +// +// For commercial licensing, contact us at: +// - Website: infinilabs.com +// - Email: hello@infini.ltd +// +// Open Source licensed under AGPL V3: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package errors + +import ( + "fmt" + "infini.sh/framework/core/errors" +) + +const ( + ErrTypeRequestParams = "request_params_error" + ErrTypeApplication = "application_error" + ErrTypeAlreadyExists = "already_exists_error" + ErrTypeNotExists = "not_exists_error" + ErrTypeIncorrectPassword = "incorrect_password_error" + ErrTypeDomainPrefixMismatch = "domain_prefix_mismatch_error" + ErrTypeDisabled = "disabled_error" + ErrTypeRequestTimeout = "request_timeout_error" +) + +var ( + ErrPasswordIncorrect = errors.New("incorrect password") + ErrNotExistsErr = errors.New("not exists") +) + +type Error struct { + typ string + msg interface{} + field string +} + +func (err Error) Error() string { + return fmt.Sprintf("%s:%v: %v", err.typ, err.field, err.msg) +} + +//NewAppError returns an application error +func NewAppError(msg any) *Error { + return New(ErrTypeApplication, "", msg) +} + +//NewParamsError returns a request params error +func NewParamsError(field string, msg any) *Error { + return New(ErrTypeRequestParams, field, msg) +} + +//NewAlreadyExistsError returns an already exists error +func NewAlreadyExistsError(field string, msg any) *Error { + return New(ErrTypeAlreadyExists, field, msg) +} + +//NewNotExistsError returns a not exists error +func NewNotExistsError(field string, msg any) *Error { + return New(ErrTypeNotExists, field, msg) +} + +func New(typ string, field string, msg any) *Error { + return &Error{ + typ, + msg, + field, + } +} \ No newline at end of file diff --git a/modules/elastic/api/cluster_overview.go b/modules/elastic/api/cluster_overview.go index 87f4d4c2..cca784ff 100644 --- a/modules/elastic/api/cluster_overview.go +++ b/modules/elastic/api/cluster_overview.go @@ -25,7 +25,9 @@ package api import ( "context" + "errors" "fmt" + cerr "infini.sh/console/core/errors" "infini.sh/framework/modules/elastic/adapter" "net/http" "strings" @@ -263,7 +265,16 @@ func (h *APIHandler) FetchClusterInfo(w http.ResponseWriter, req *http.Request, } ctx, cancel := context.WithTimeout(context.Background(), du) defer cancel() - indexMetrics := h.getMetrics(ctx, query, indexMetricItems, bucketSize) + indexMetrics, err := h.getMetrics(ctx, query, indexMetricItems, bucketSize) + if err != nil { + log.Error(err) + if errors.Is(err, context.DeadlineExceeded) { + h.WriteError(w, cerr.New(cerr.ErrTypeRequestTimeout, "", err).Error(), http.StatusRequestTimeout) + return + } + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } indexingMetricData := util.MapStr{} for _, line := range indexMetrics["cluster_indexing"].Lines { // remove first metric dot diff --git a/modules/elastic/api/host.go b/modules/elastic/api/host.go index 69e9701f..9cdb5adf 100644 --- a/modules/elastic/api/host.go +++ b/modules/elastic/api/host.go @@ -495,7 +495,12 @@ func (h *APIHandler) FetchHostInfo(w http.ResponseWriter, req *http.Request, ps Units: "/s", }, } - hostMetrics := h.getGroupHostMetric(context.Background(), agentIDs, min, max, bucketSize, hostMetricItems, "agent.id") + hostMetrics, err := h.getGroupHostMetric(context.Background(), agentIDs, min, max, bucketSize, hostMetricItems, "agent.id") + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } networkMetrics := map[string]util.MapStr{} for key, item := range hostMetrics { @@ -572,7 +577,7 @@ func (h *APIHandler) GetHostInfo(w http.ResponseWriter, req *http.Request, ps ht } -func (h *APIHandler) getSingleHostMetric(ctx context.Context, agentID string, min, max int64, bucketSize int, metricItems []*common.MetricItem) map[string]*common.MetricItem { +func (h *APIHandler) getSingleHostMetric(ctx context.Context, agentID string, min, max int64, bucketSize int, metricItems []*common.MetricItem) (map[string]*common.MetricItem, error) { var must = []util.MapStr{ { "term": util.MapStr{ @@ -608,7 +613,7 @@ func (h *APIHandler) getSingleHostMetric(ctx context.Context, agentID string, mi return h.getSingleMetrics(ctx, metricItems, query, bucketSize) } -func (h *APIHandler) getSingleHostMetricFromNode(ctx context.Context, nodeID string, min, max int64, bucketSize int, metricKey string) map[string]*common.MetricItem { +func (h *APIHandler) getSingleHostMetricFromNode(ctx context.Context, nodeID string, min, max int64, bucketSize int, metricKey string) (map[string]*common.MetricItem, error) { var must = []util.MapStr{ { "term": util.MapStr{ @@ -725,7 +730,12 @@ func (h *APIHandler) GetSingleHostMetrics(w http.ResponseWriter, req *http.Reque ctx, cancel := context.WithTimeout(context.Background(), du) defer cancel() if hostInfo.AgentID == "" { - resBody["metrics"] = h.getSingleHostMetricFromNode(ctx, hostInfo.NodeID, min, max, bucketSize, key) + resBody["metrics"], err = h.getSingleHostMetricFromNode(ctx, hostInfo.NodeID, min, max, bucketSize, key) + if err != nil { + log.Error(err) + h.WriteError(w, err, http.StatusInternalServerError) + return + } h.WriteJSON(w, resBody, http.StatusOK) return } @@ -788,20 +798,30 @@ func (h *APIHandler) GetSingleHostMetrics(w http.ResponseWriter, req *http.Reque metricItem.AddLine("Disk Write Rate", "Disk Write Rate", "network write rate of host.", "group1", "payload.host.diskio_summary.write.bytes", "max", bucketSizeStr, "%", "bytes", "0,0.[00]", "0,0.[00]", false, true) metricItems = append(metricItems, metricItem) case DiskPartitionUsageMetricKey, NetworkInterfaceOutputRateMetricKey: - groupMetrics := h.getGroupHostMetrics(ctx, hostInfo.AgentID, min, max, bucketSize, key) - resBody["metrics"] = groupMetrics + resBody["metrics"] , err = h.getGroupHostMetrics(ctx, hostInfo.AgentID, min, max, bucketSize, key) + if err != nil { + log.Error(err) + h.WriteError(w, err, http.StatusInternalServerError) + return + } h.WriteJSON(w, resBody, http.StatusOK) return } - hostMetrics := h.getSingleHostMetric(ctx, hostInfo.AgentID, min, max, bucketSize, metricItems) + hostMetrics, err := h.getSingleHostMetric(ctx, hostInfo.AgentID, min, max, bucketSize, metricItems) + if err != nil { + log.Error(err) + h.WriteError(w, err, http.StatusInternalServerError) + return + } resBody["metrics"] = hostMetrics h.WriteJSON(w, resBody, http.StatusOK) } -func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, min, max int64, bucketSize int, metricKey string) map[string]*common.MetricItem { +func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, min, max int64, bucketSize int, metricKey string) (map[string]*common.MetricItem, error) { var metrics = make(map[string]*common.MetricItem) + var err error switch metricKey { case DiskPartitionUsageMetricKey: diskPartitionMetric := newMetricItem(DiskPartitionUsageMetricKey, 2, SystemGroupKey) @@ -817,7 +837,7 @@ func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, mi Units: "%", }, } - metrics = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.disk_partition_usage.partition") + metrics, err = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.disk_partition_usage.partition") case NetworkInterfaceOutputRateMetricKey: networkOutputMetric := newMetricItem(NetworkInterfaceOutputRateMetricKey, 2, SystemGroupKey) networkOutputMetric.AddAxi("Network interface output rate", "group1", common.PositionLeft, "bytes", "0.[0]", "0.[0]", 5, true) @@ -832,13 +852,13 @@ func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, mi Units: "", }, } - metrics = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.network_interface.name") + metrics, err = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.network_interface.name") } - return metrics + return metrics, err } -func (h *APIHandler) getGroupHostMetric(ctx context.Context, agentIDs []string, min, max int64, bucketSize int, hostMetricItems []GroupMetricItem, groupField string) map[string]*common.MetricItem { +func (h *APIHandler) getGroupHostMetric(ctx context.Context, agentIDs []string, min, max int64, bucketSize int, hostMetricItems []GroupMetricItem, groupField string) (map[string]*common.MetricItem, error) { var must = []util.MapStr{ { "term": util.MapStr{ diff --git a/modules/elastic/api/index_metrics.go b/modules/elastic/api/index_metrics.go index 2638f2c3..f3002596 100644 --- a/modules/elastic/api/index_metrics.go +++ b/modules/elastic/api/index_metrics.go @@ -742,7 +742,7 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu }, }, } - return h.getMetrics(ctx, query, indexMetricItems, bucketSize), nil + return h.getMetrics(ctx, query, indexMetricItems, bucketSize) } diff --git a/modules/elastic/api/index_overview.go b/modules/elastic/api/index_overview.go index 203628f2..fb9bc53b 100644 --- a/modules/elastic/api/index_overview.go +++ b/modules/elastic/api/index_overview.go @@ -531,7 +531,11 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p }, }, } - metrics := h.getMetrics(ctx, query, nodeMetricItems, bucketSize) + metrics, err := h.getMetrics(ctx, query, nodeMetricItems, bucketSize) + if err != nil { + log.Error(err) + h.WriteError(w, err, http.StatusInternalServerError) + } indexMetrics := map[string]util.MapStr{} for key, item := range metrics { for _, line := range item.Lines { @@ -921,6 +925,8 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ healthMetric, err := h.GetIndexHealthMetric(ctx, clusterID, indexName, min, max, bucketSize) if err != nil { log.Error(err) + h.WriteError(w, err, http.StatusInternalServerError) + return } metrics["index_health"] = healthMetric } else { @@ -987,7 +993,11 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ } metricItems = append(metricItems, metricItem) } - metrics = h.getSingleIndexMetrics(context.Background(), metricItems, query, bucketSize) + metrics, err = h.getSingleIndexMetrics(context.Background(), metricItems, query, bucketSize) + if err != nil { + log.Error(err) + h.WriteError(w, err, http.StatusInternalServerError) + } } resBody["metrics"] = metrics diff --git a/modules/elastic/api/manage.go b/modules/elastic/api/manage.go index dacc4148..cd83e67b 100644 --- a/modules/elastic/api/manage.go +++ b/modules/elastic/api/manage.go @@ -561,9 +561,14 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http ctx, cancel := context.WithTimeout(context.Background(), du) defer cancel() if util.StringInArray([]string{v1.IndexThroughputMetricKey, v1.SearchThroughputMetricKey, v1.IndexLatencyMetricKey, v1.SearchLatencyMetricKey}, key) { - metrics = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, key) + metrics, err = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, key) }else{ - metrics = h.GetClusterMetrics(ctx, id, bucketSize, min, max, key) + metrics, err = h.GetClusterMetrics(ctx, id, bucketSize, min, max, key) + } + if err != nil { + log.Error(err) + h.WriteError(w, err, http.StatusInternalServerError) + return } resBody["metrics"] = metrics @@ -603,7 +608,7 @@ func (h *APIHandler) HandleNodeMetricsAction(w http.ResponseWriter, req *http.Re resBody["metrics"], err = h.getNodeMetrics(ctx, id, bucketSize, min, max, nodeName, top, key) if err != nil { log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) + h.WriteError(w, err, http.StatusInternalServerError) return } ver := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).GetVersion() @@ -903,35 +908,36 @@ const ( CircuitBreakerMetricKey = "circuit_breaker" ) -func (h *APIHandler) GetClusterMetrics(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) map[string]*common.MetricItem { +func (h *APIHandler) GetClusterMetrics(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) (map[string]*common.MetricItem, error) { - var clusterMetricsResult = map[string]*common.MetricItem {} + var ( + clusterMetricsResult = map[string]*common.MetricItem {} + err error + ) switch metricKey { case ClusterDocumentsMetricKey, ClusterStorageMetricKey, ClusterIndicesMetricKey, ClusterNodeCountMetricKey: - clusterMetricsResult = h.getClusterMetricsByKey(ctx, id, bucketSize, min, max, metricKey) + clusterMetricsResult, err = h.getClusterMetricsByKey(ctx, id, bucketSize, min, max, metricKey) case v1.IndexLatencyMetricKey, v1.IndexThroughputMetricKey, v1.SearchThroughputMetricKey, v1.SearchLatencyMetricKey: - clusterMetricsResult = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, metricKey) + clusterMetricsResult, err = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, metricKey) case ClusterHealthMetricKey: - statusMetric, err := h.getClusterStatusMetric(ctx, id, min, max, bucketSize) + var statusMetric *common.MetricItem + statusMetric, err = h.getClusterStatusMetric(ctx, id, min, max, bucketSize) if err == nil { clusterMetricsResult[ClusterHealthMetricKey] = statusMetric - } else { - log.Error("get cluster status metric error: ", err) } case ShardCountMetricKey: - clusterMetricsResult = h.getShardsMetric(ctx, id, min, max, bucketSize) + clusterMetricsResult, err = h.getShardsMetric(ctx, id, min, max, bucketSize) case CircuitBreakerMetricKey: - clusterMetricsResult = h.getCircuitBreakerMetric(ctx, id, min, max, bucketSize) + clusterMetricsResult, err = h.getCircuitBreakerMetric(ctx, id, min, max, bucketSize) } - - return clusterMetricsResult + return clusterMetricsResult, err } -func (h *APIHandler) getClusterMetricsByKey(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) map[string]*common.MetricItem { +func (h *APIHandler) getClusterMetricsByKey(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) (map[string]*common.MetricItem, error) { bucketSizeStr := fmt.Sprintf("%vs", bucketSize) clusterMetricItems := []*common.MetricItem{} @@ -964,7 +970,7 @@ func (h *APIHandler) getClusterMetricsByKey(ctx context.Context, id string, buck meta := elastic.GetMetadata(id) if meta == nil { err := fmt.Errorf("metadata of cluster [%s] is not found", id) - panic(err) + return nil, err } majorVersion := meta.GetMajorVersion() @@ -1024,7 +1030,7 @@ func (h *APIHandler) getClusterMetricsByKey(ctx context.Context, id string, buck return h.getSingleMetrics(ctx, clusterMetricItems, query, bucketSize) } -func (h *APIHandler) GetClusterIndexMetrics(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) map[string]*common.MetricItem { +func (h *APIHandler) GetClusterIndexMetrics(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) (map[string]*common.MetricItem, error) { bucketSizeStr := fmt.Sprintf("%vs", bucketSize) metricItems := []*common.MetricItem{} switch metricKey { @@ -1081,7 +1087,7 @@ func (h *APIHandler) GetClusterIndexMetrics(ctx context.Context, id string, buck query := map[string]interface{}{} clusterUUID, err := adapter.GetClusterUUID(id) if err != nil { - panic(err) + return nil, err } query["query"] = util.MapStr{ "bool": util.MapStr{ @@ -1123,7 +1129,7 @@ func (h *APIHandler) GetClusterIndexMetrics(ctx context.Context, id string, buck return h.getSingleIndexMetricsByNodeStats(ctx, metricItems, query, bucketSize) } -func (h *APIHandler) getShardsMetric(ctx context.Context, id string, min, max int64, bucketSize int) map[string]*common.MetricItem { +func (h *APIHandler) getShardsMetric(ctx context.Context, id string, min, max int64, bucketSize int) (map[string]*common.MetricItem, error) { bucketSizeStr := fmt.Sprintf("%vs", bucketSize) query := util.MapStr{ "query": util.MapStr{ @@ -1185,7 +1191,7 @@ func (h *APIHandler) getShardsMetric(ctx context.Context, id string, min, max in return h.getSingleMetrics(ctx, clusterHealthMetrics, query, bucketSize) } -func (h *APIHandler) getCircuitBreakerMetric(ctx context.Context, id string, min, max int64, bucketSize int) map[string]*common.MetricItem { +func (h *APIHandler) getCircuitBreakerMetric(ctx context.Context, id string, min, max int64, bucketSize int) (map[string]*common.MetricItem, error) { bucketSizeStr := fmt.Sprintf("%vs", bucketSize) query := util.MapStr{ "query": util.MapStr{ diff --git a/modules/elastic/api/metrics_util.go b/modules/elastic/api/metrics_util.go index 7279c5de..672e7b3e 100644 --- a/modules/elastic/api/metrics_util.go +++ b/modules/elastic/api/metrics_util.go @@ -110,13 +110,12 @@ func generateGroupAggs(nodeMetricItems []GroupMetricItem) map[string]interface{} return aggs } -func (h *APIHandler) getMetrics(ctx context.Context, query map[string]interface{}, grpMetricItems []GroupMetricItem, bucketSize int) map[string]*common.MetricItem { +func (h *APIHandler) getMetrics(ctx context.Context, query map[string]interface{}, grpMetricItems []GroupMetricItem, bucketSize int) (map[string]*common.MetricItem, error) { bucketSizeStr := fmt.Sprintf("%vs", bucketSize) queryDSL := util.MustToJSONBytes(query) response, err := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).QueryDSL(ctx, getAllMetricsIndex(),nil, queryDSL) if err != nil { - log.Error(err) - panic(err) + return nil, err } grpMetricItemsIndex := map[string]int{} for i, item := range grpMetricItems { @@ -146,9 +145,9 @@ func (h *APIHandler) getMetrics(ctx context.Context, query map[string]interface{ if bucketMap, ok := dateBucket.(map[string]interface{}); ok { v, ok := bucketMap["key"].(float64) if !ok { - panic("invalid bucket key") + return nil, fmt.Errorf("invalid bucket key type: %T", bucketMap["key"]) } - dateTime := (int64(v)) + dateTime := int64(v) minDate = util.MinInt64(minDate, dateTime) maxDate = util.MaxInt64(maxDate, dateTime) @@ -219,7 +218,7 @@ func (h *APIHandler) getMetrics(ctx context.Context, query map[string]interface{ metricItem.MetricItem.Request = string(queryDSL) result[metricItem.Key] = metricItem.MetricItem } - return result + return result, nil } func GetMinBucketSize() int { @@ -340,7 +339,7 @@ func GetMetricRangeAndBucketSize(minStr string, maxStr string, bucketSize int, m } // 获取单个指标,可以包含多条曲线 -func (h *APIHandler) getSingleMetrics(ctx context.Context, metricItems []*common.MetricItem, query map[string]interface{}, bucketSize int) map[string]*common.MetricItem { +func (h *APIHandler) getSingleMetrics(ctx context.Context, metricItems []*common.MetricItem, query map[string]interface{}, bucketSize int) (map[string]*common.MetricItem, error) { metricData := map[string][][]interface{}{} aggs := map[string]interface{}{} @@ -386,8 +385,7 @@ func (h *APIHandler) getSingleMetrics(ctx context.Context, metricItems []*common clusterID := global.MustLookupString(elastic.GlobalSystemElasticsearchID) intervalField, err := getDateHistogramIntervalField(clusterID, bucketSizeStr) if err != nil { - log.Error(err) - panic(err) + return nil, err } query["size"] = 0 query["aggs"] = util.MapStr{ @@ -402,8 +400,7 @@ func (h *APIHandler) getSingleMetrics(ctx context.Context, metricItems []*common queryDSL := util.MustToJSONBytes(query) response, err := elastic.GetClient(clusterID).QueryDSL(ctx, getAllMetricsIndex(), nil, queryDSL) if err != nil { - log.Error(err) - panic(err) + return nil, err } var minDate, maxDate int64 @@ -475,7 +472,7 @@ func (h *APIHandler) getSingleMetrics(ctx context.Context, metricItems []*common result[metricItem.Key] = metricItem } - return result + return result, nil } //func (h *APIHandler) executeQuery(query map[string]interface{}, bucketItems *[]common.BucketItem, bucketSize int) map[string]*common.MetricItem { @@ -964,7 +961,7 @@ func parseGroupMetricData(buckets []elastic.BucketBase, isPercent bool) ([]inter return metricData, nil } -func (h *APIHandler) getSingleIndexMetricsByNodeStats(ctx context.Context, metricItems []*common.MetricItem, query map[string]interface{}, bucketSize int) map[string]*common.MetricItem { +func (h *APIHandler) getSingleIndexMetricsByNodeStats(ctx context.Context, metricItems []*common.MetricItem, query map[string]interface{}, bucketSize int) (map[string]*common.MetricItem, error) { metricData := map[string][][]interface{}{} aggs := util.MapStr{} @@ -1034,7 +1031,7 @@ func (h *APIHandler) getSingleIndexMetricsByNodeStats(ctx context.Context, metri clusterID := global.MustLookupString(elastic.GlobalSystemElasticsearchID) intervalField, err := getDateHistogramIntervalField(clusterID, bucketSizeStr) if err != nil { - panic(err) + return nil, err } query["size"] = 0 query["aggs"] = util.MapStr{ @@ -1049,7 +1046,7 @@ func (h *APIHandler) getSingleIndexMetricsByNodeStats(ctx context.Context, metri return parseSingleIndexMetrics(ctx, clusterID, metricItems, query, bucketSize,metricData, metricItemsMap) } -func (h *APIHandler) getSingleIndexMetrics(ctx context.Context, metricItems []*common.MetricItem, query map[string]interface{}, bucketSize int) map[string]*common.MetricItem { +func (h *APIHandler) getSingleIndexMetrics(ctx context.Context, metricItems []*common.MetricItem, query map[string]interface{}, bucketSize int) (map[string]*common.MetricItem, error) { metricData := map[string][][]interface{}{} aggs := util.MapStr{} @@ -1154,11 +1151,11 @@ func (h *APIHandler) getSingleIndexMetrics(ctx context.Context, metricItems []*c return parseSingleIndexMetrics(ctx, clusterID, metricItems, query, bucketSize,metricData, metricItemsMap) } -func parseSingleIndexMetrics(ctx context.Context, clusterID string, metricItems []*common.MetricItem, query map[string]interface{}, bucketSize int, metricData map[string][][]interface{}, metricItemsMap map[string]*common.MetricLine) map[string]*common.MetricItem { +func parseSingleIndexMetrics(ctx context.Context, clusterID string, metricItems []*common.MetricItem, query map[string]interface{}, bucketSize int, metricData map[string][][]interface{}, metricItemsMap map[string]*common.MetricLine) (map[string]*common.MetricItem, error) { queryDSL := util.MustToJSONBytes(query) response, err := elastic.GetClient(clusterID).QueryDSL(ctx, getAllMetricsIndex(), nil, util.MustToJSONBytes(query)) if err != nil { - panic(err) + return nil, err } var minDate, maxDate int64 @@ -1167,9 +1164,9 @@ func parseSingleIndexMetrics(ctx context.Context, clusterID string, metricItems for _, bucket := range v.Buckets { v, ok := bucket["key"].(float64) if !ok { - panic("invalid bucket key") + return nil, fmt.Errorf("invalid bucket key type: %T", bucket["key"]) } - dateTime := (int64(v)) + dateTime := int64(v) minDate = util.MinInt64(minDate, dateTime) maxDate = util.MaxInt64(maxDate, dateTime) for mk1, mv1 := range metricData { @@ -1230,5 +1227,5 @@ func parseSingleIndexMetrics(ctx context.Context, clusterID string, metricItems result[metricItem.Key] = metricItem } - return result + return result, nil } \ No newline at end of file diff --git a/modules/elastic/api/node_metrics.go b/modules/elastic/api/node_metrics.go index 8688076f..f3ddc425 100644 --- a/modules/elastic/api/node_metrics.go +++ b/modules/elastic/api/node_metrics.go @@ -1122,7 +1122,7 @@ func (h *APIHandler) getNodeMetrics(ctx context.Context, clusterID string, bucke }, }, } - return h.getMetrics(ctx, query, nodeMetricItems, bucketSize), nil + return h.getMetrics(ctx, query, nodeMetricItems, bucketSize) } diff --git a/modules/elastic/api/node_overview.go b/modules/elastic/api/node_overview.go index 913f362d..0f6e0b70 100644 --- a/modules/elastic/api/node_overview.go +++ b/modules/elastic/api/node_overview.go @@ -422,7 +422,12 @@ func (h *APIHandler) FetchNodeInfo(w http.ResponseWriter, req *http.Request, ps }, }, } - metrics := h.getMetrics(ctx, query, nodeMetricItems, bucketSize) + metrics, err := h.getMetrics(ctx, query, nodeMetricItems, bucketSize) + if err != nil { + log.Error(err) + h.WriteError(w, err, http.StatusInternalServerError) + return + } indexMetrics := map[string]util.MapStr{} for key, item := range metrics { for _, line := range item.Lines { @@ -787,7 +792,12 @@ func (h *APIHandler) GetSingleNodeMetrics(w http.ResponseWriter, req *http.Reque metricItems=append(metricItems,metricItem) } - metrics = h.getSingleMetrics(ctx, metricItems,query, bucketSize) + metrics, err = h.getSingleMetrics(ctx, metricItems,query, bucketSize) + if err != nil { + log.Error(err) + h.WriteError(w, err, http.StatusInternalServerError) + return + } } resBody["metrics"] = metrics diff --git a/modules/elastic/api/threadpool_metrics.go b/modules/elastic/api/threadpool_metrics.go index f385fe3f..93d1e0f3 100644 --- a/modules/elastic/api/threadpool_metrics.go +++ b/modules/elastic/api/threadpool_metrics.go @@ -636,5 +636,5 @@ func (h *APIHandler) getThreadPoolMetrics(ctx context.Context, clusterID string, }, }, } - return h.getMetrics(ctx, query, queueMetricItems, bucketSize), nil + return h.getMetrics(ctx, query, queueMetricItems, bucketSize) } diff --git a/modules/elastic/api/v1/index_metrics.go b/modules/elastic/api/v1/index_metrics.go index 84392b16..070b4161 100644 --- a/modules/elastic/api/v1/index_metrics.go +++ b/modules/elastic/api/v1/index_metrics.go @@ -75,7 +75,7 @@ const ( DocPercentMetricKey = "doc_percent" ) -func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clusterID string, bucketSize int, min, max int64, indexName string, top int, metricKey string) map[string]*common.MetricItem{ +func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clusterID string, bucketSize int, min, max int64, indexName string, top int, metricKey string) (map[string]*common.MetricItem, error){ bucketSizeStr:=fmt.Sprintf("%vs",bucketSize) var must = []util.MapStr{ @@ -109,7 +109,7 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu indexNames = strings.Split(indexName, ",") allowedIndices, hasAllPrivilege := h.GetAllowedIndices(req, clusterID) if !hasAllPrivilege && len(allowedIndices) == 0 { - return nil + return nil, nil } if !hasAllPrivilege{ namePattern := radix.Compile(allowedIndices...) @@ -120,7 +120,7 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu } } if len(filterNames) == 0 { - return nil + return nil, nil } indexNames = filterNames } @@ -665,8 +665,7 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu } intervalField, err := getDateHistogramIntervalField(global.MustLookupString(elastic.GlobalSystemElasticsearchID), bucketSizeStr) if err != nil { - log.Error(err) - panic(err) + return nil, err } query["size"]=0 diff --git a/modules/elastic/api/v1/index_overview.go b/modules/elastic/api/v1/index_overview.go index a8420b92..f9007ee7 100644 --- a/modules/elastic/api/v1/index_overview.go +++ b/modules/elastic/api/v1/index_overview.go @@ -236,7 +236,12 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, ctx context.Context, }, }, } - metrics := h.getMetrics(ctx, query, nodeMetricItems, bucketSize) + metrics, err := h.getMetrics(ctx, query, nodeMetricItems, bucketSize) + if err != nil { + log.Error(err) + h.WriteError(w, err, http.StatusInternalServerError) + return + } indexMetrics := map[string]util.MapStr{} for key, item := range metrics { for _, line := range item.Lines { @@ -565,7 +570,12 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ } metricItems = append(metricItems, metricItem) } - metrics = h.getSingleMetrics(ctx, metricItems, query, bucketSize) + metrics, err = h.getSingleMetrics(ctx, metricItems, query, bucketSize) + if err != nil { + log.Error(err) + h.WriteError(w, err, http.StatusInternalServerError) + return + } } resBody["metrics"] = metrics h.WriteJSON(w, resBody, http.StatusOK) diff --git a/modules/elastic/api/v1/manage.go b/modules/elastic/api/v1/manage.go index fbd15f20..5536408d 100644 --- a/modules/elastic/api/v1/manage.go +++ b/modules/elastic/api/v1/manage.go @@ -526,7 +526,7 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http defer cancel() var metrics interface{} if util.StringInArray([]string{IndexThroughputMetricKey, SearchThroughputMetricKey, IndexLatencyMetricKey, SearchLatencyMetricKey}, key) { - metrics = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, key) + metrics, err = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, key) } else { if meta != nil && meta.Config.MonitorConfigs != nil && meta.Config.MonitorConfigs.ClusterStats.Enabled && meta.Config.MonitorConfigs.ClusterStats.Interval != "" { du, _ := time.ParseDuration(meta.Config.MonitorConfigs.ClusterStats.Interval) @@ -540,7 +540,12 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http bucketSize = int(du.Seconds()) } } - metrics = h.GetClusterMetrics(ctx, id, bucketSize, min, max, key) + metrics, err = h.GetClusterMetrics(ctx, id, bucketSize, min, max, key) + } + if err != nil { + log.Error(err) + h.WriteError(w, err, http.StatusInternalServerError) + return } resBody["metrics"] = metrics @@ -583,8 +588,16 @@ func (h *APIHandler) HandleIndexMetricsAction(w http.ResponseWriter, req *http.R defer cancel() var metrics map[string]*common.MetricItem if key == DocPercentMetricKey { - metrics = h.getIndexMetrics(ctx, req, id, bucketSize, min, max, indexName, top, DocCountMetricKey) - docsDeletedMetrics := h.getIndexMetrics(ctx, req, id, bucketSize, min, max, indexName, top, DocsDeletedMetricKey) + metrics, err = h.getIndexMetrics(ctx, req, id, bucketSize, min, max, indexName, top, DocCountMetricKey) + if err != nil { + h.WriteError(w, err, http.StatusInternalServerError) + return + } + docsDeletedMetrics, err := h.getIndexMetrics(ctx, req, id, bucketSize, min, max, indexName, top, DocsDeletedMetricKey) + if err != nil { + h.WriteError(w, err, http.StatusInternalServerError) + return + } for k, v := range docsDeletedMetrics { if v != nil { metrics[k] = v @@ -635,7 +648,11 @@ func (h *APIHandler) HandleIndexMetricsAction(w http.ResponseWriter, req *http.R } } }else{ - metrics = h.getIndexMetrics(ctx, req, id, bucketSize, min, max, indexName, top, key) + metrics, err = h.getIndexMetrics(ctx, req, id, bucketSize, min, max, indexName, top, key) + if err != nil { + h.WriteError(w, err, http.StatusInternalServerError) + return + } } resBody["metrics"] = metrics @@ -768,35 +785,37 @@ const ( ShardCountMetricKey = "shard_count" CircuitBreakerMetricKey = "circuit_breaker" ) -func (h *APIHandler) GetClusterMetrics(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) map[string]*common.MetricItem { +func (h *APIHandler) GetClusterMetrics(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) (map[string]*common.MetricItem, error) { - var clusterMetricsResult = map[string]*common.MetricItem {} + var ( + clusterMetricsResult = map[string]*common.MetricItem {} + err error + ) switch metricKey { case ClusterDocumentsMetricKey, ClusterStorageMetricKey, ClusterIndicesMetricKey, ClusterNodeCountMetricKey: - clusterMetricsResult = h.getClusterMetricsByKey(ctx, id, bucketSize, min, max, metricKey) + clusterMetricsResult, err = h.getClusterMetricsByKey(ctx, id, bucketSize, min, max, metricKey) case IndexLatencyMetricKey, IndexThroughputMetricKey, SearchThroughputMetricKey, SearchLatencyMetricKey: - clusterMetricsResult = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, metricKey) + clusterMetricsResult, err = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, metricKey) case ClusterHealthMetricKey: - statusMetric, err := h.getClusterStatusMetric(ctx, id, min, max, bucketSize) + var statusMetric *common.MetricItem + statusMetric, err = h.getClusterStatusMetric(ctx, id, min, max, bucketSize) if err == nil { clusterMetricsResult[ClusterHealthMetricKey] = statusMetric - } else { - log.Error("get cluster status metric error: ", err) } case ShardCountMetricKey: - clusterMetricsResult = h.getShardsMetric(ctx, id, min, max, bucketSize) + clusterMetricsResult, err = h.getShardsMetric(ctx, id, min, max, bucketSize) case CircuitBreakerMetricKey: - clusterMetricsResult = h.getCircuitBreakerMetric(ctx, id, min, max, bucketSize) + clusterMetricsResult, err = h.getCircuitBreakerMetric(ctx, id, min, max, bucketSize) } - return clusterMetricsResult + return clusterMetricsResult, err } -func (h *APIHandler) getClusterMetricsByKey(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) map[string]*common.MetricItem { +func (h *APIHandler) getClusterMetricsByKey(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) (map[string]*common.MetricItem, error) { bucketSizeStr := fmt.Sprintf("%vs", bucketSize) query := map[string]interface{}{} query["query"] = util.MapStr{ @@ -865,7 +884,7 @@ func (h *APIHandler) getClusterMetricsByKey(ctx context.Context, id string, buck meta := elastic.GetMetadata(id) if meta == nil { err := fmt.Errorf("metadata of cluster [%s] is not found", id) - panic(err) + return nil, err } majorVersion := meta.GetMajorVersion() @@ -891,7 +910,7 @@ const ( IndexLatencyMetricKey = "index_latency" SearchLatencyMetricKey = "search_latency" ) -func (h *APIHandler) GetClusterIndexMetrics(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) map[string]*common.MetricItem { +func (h *APIHandler) GetClusterIndexMetrics(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) (map[string]*common.MetricItem, error) { bucketSizeStr := fmt.Sprintf("%vs", bucketSize) metricItems := []*common.MetricItem{} switch metricKey { @@ -995,7 +1014,7 @@ func (h *APIHandler) GetClusterIndexMetrics(ctx context.Context, id string, buck return h.getSingleMetrics(ctx, metricItems, query, bucketSize) } -func (h *APIHandler) getShardsMetric(ctx context.Context, id string, min, max int64, bucketSize int) map[string]*common.MetricItem { +func (h *APIHandler) getShardsMetric(ctx context.Context, id string, min, max int64, bucketSize int) (map[string]*common.MetricItem, error) { bucketSizeStr := fmt.Sprintf("%vs", bucketSize) query := util.MapStr{ "query": util.MapStr{ @@ -1057,7 +1076,7 @@ func (h *APIHandler) getShardsMetric(ctx context.Context, id string, min, max in return h.getSingleMetrics(ctx, clusterHealthMetrics, query, bucketSize) } -func (h *APIHandler) getCircuitBreakerMetric(ctx context.Context, id string, min, max int64, bucketSize int) map[string]*common.MetricItem { +func (h *APIHandler) getCircuitBreakerMetric(ctx context.Context, id string, min, max int64, bucketSize int) (map[string]*common.MetricItem, error) { bucketSizeStr := fmt.Sprintf("%vs", bucketSize) query := util.MapStr{ "query": util.MapStr{ diff --git a/modules/elastic/api/v1/metrics_util.go b/modules/elastic/api/v1/metrics_util.go index 213d7847..626d23e6 100644 --- a/modules/elastic/api/v1/metrics_util.go +++ b/modules/elastic/api/v1/metrics_util.go @@ -110,13 +110,12 @@ func generateGroupAggs(nodeMetricItems []GroupMetricItem) map[string]interface{} return aggs } -func (h *APIHandler) getMetrics(ctx context.Context, query map[string]interface{}, grpMetricItems []GroupMetricItem, bucketSize int) map[string]*common.MetricItem { +func (h *APIHandler) getMetrics(ctx context.Context, query map[string]interface{}, grpMetricItems []GroupMetricItem, bucketSize int) (map[string]*common.MetricItem, error) { bucketSizeStr := fmt.Sprintf("%vs", bucketSize) queryDSL := util.MustToJSONBytes(query) response, err := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).QueryDSL(ctx, getAllMetricsIndex(), nil, queryDSL) if err != nil { - log.Error(err) - panic(err) + return nil, err } grpMetricItemsIndex := map[string]int{} for i, item := range grpMetricItems { @@ -210,7 +209,7 @@ func (h *APIHandler) getMetrics(ctx context.Context, query map[string]interface{ metricItem.MetricItem.Request = string(queryDSL) result[metricItem.Key] = metricItem.MetricItem } - return result + return result, nil } func GetMinBucketSize() int { @@ -331,7 +330,7 @@ func GetMetricRangeAndBucketSize(minStr string, maxStr string, bucketSize int, m } // 获取单个指标,可以包含多条曲线 -func (h *APIHandler) getSingleMetrics(ctx context.Context, metricItems []*common.MetricItem, query map[string]interface{}, bucketSize int) map[string]*common.MetricItem { +func (h *APIHandler) getSingleMetrics(ctx context.Context, metricItems []*common.MetricItem, query map[string]interface{}, bucketSize int) (map[string]*common.MetricItem, error) { metricData := map[string][][]interface{}{} aggs := map[string]interface{}{} @@ -377,8 +376,7 @@ func (h *APIHandler) getSingleMetrics(ctx context.Context, metricItems []*common clusterID := global.MustLookupString(elastic.GlobalSystemElasticsearchID) intervalField, err := getDateHistogramIntervalField(clusterID, bucketSizeStr) if err != nil { - log.Error(err) - panic(err) + return nil, err } query["size"] = 0 query["aggs"] = util.MapStr{ @@ -393,8 +391,7 @@ func (h *APIHandler) getSingleMetrics(ctx context.Context, metricItems []*common queryDSL := util.MustToJSONBytes(query) response, err := elastic.GetClient(clusterID).QueryDSL(ctx, getAllMetricsIndex(), nil, queryDSL) if err != nil { - log.Error(err) - panic(err) + return nil, err } var minDate, maxDate int64 @@ -457,7 +454,7 @@ func (h *APIHandler) getSingleMetrics(ctx context.Context, metricItems []*common result[metricItem.Key] = metricItem } - return result + return result, nil } //func (h *APIHandler) executeQuery(query map[string]interface{}, bucketItems *[]common.BucketItem, bucketSize int) map[string]*common.MetricItem { diff --git a/modules/elastic/api/v1/node_overview.go b/modules/elastic/api/v1/node_overview.go index 050c890b..e8fcb2eb 100644 --- a/modules/elastic/api/v1/node_overview.go +++ b/modules/elastic/api/v1/node_overview.go @@ -411,7 +411,12 @@ func (h *APIHandler) FetchNodeInfo(w http.ResponseWriter, req *http.Request, ps }, }, } - metrics := h.getMetrics(context.Background(), query, nodeMetricItems, bucketSize) + metrics, err := h.getMetrics(context.Background(), query, nodeMetricItems, bucketSize) + if err != nil { + log.Error(err) + h.WriteError(w, err, http.StatusInternalServerError) + return + } indexMetrics := map[string]util.MapStr{} for key, item := range metrics { for _, line := range item.Lines { @@ -693,10 +698,17 @@ func (h *APIHandler) GetSingleNodeMetrics(w http.ResponseWriter, req *http.Reque metricItem =newMetricItem("parent_breaker", 8, SystemGroupKey) metricItem.AddLine("Parent Breaker Tripped","Parent Breaker Tripped","Rate of the circuit breaker has been triggered and prevented an out of memory error.","group1","payload.elasticsearch.node_stats.breakers.parent.tripped","max",bucketSizeStr,"times/s","num","0,0.[00]","0,0.[00]",false,true) metricItems=append(metricItems,metricItem) - metrics := h.getSingleMetrics(context.Background(), metricItems,query, bucketSize) + metrics, err := h.getSingleMetrics(context.Background(), metricItems,query, bucketSize) + if err != nil { + log.Error(err) + h.WriteError(w, err, http.StatusInternalServerError) + return + } healthMetric, err := getNodeHealthMetric(query, bucketSize) if err != nil { log.Error(err) + h.WriteError(w, err, http.StatusInternalServerError) + return } metrics["node_health"] = healthMetric resBody["metrics"] = metrics