Skip to content

Commit

Permalink
chore: custom request timeout error with quering metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
silenceqi committed Dec 13, 2024
1 parent a44db63 commit c6af84c
Show file tree
Hide file tree
Showing 16 changed files with 289 additions and 99 deletions.
16 changes: 16 additions & 0 deletions core/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
package core

import (
"context"
"errors"
"fmt"
cerr "infini.sh/console/core/errors"
"infini.sh/console/core/security"
"infini.sh/framework/core/api"
httprouter "infini.sh/framework/core/api/router"
Expand All @@ -37,6 +41,18 @@ type Handler struct {
api.Handler
}

func (handler Handler) WriteError(w http.ResponseWriter, err interface{}, status int) {
if v, ok := err.(error); ok {
if errors.Is(v, context.DeadlineExceeded) {
handler.Handler.WriteError(w, cerr.New(cerr.ErrTypeRequestTimeout, "", err).Error(), status)
return
}
handler.Handler.WriteError(w, v.Error(), status)
return
}
handler.Handler.WriteError(w, fmt.Sprintf("%v", err), status)
}

func (handler Handler) RequireLogin(h httprouter.Handle) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {

Expand Down
83 changes: 83 additions & 0 deletions core/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (C) INFINI Labs & INFINI LIMITED.
//
// The INFINI Console is offered under the GNU Affero General Public License v3.0
// and as commercial software.
//
// For commercial licensing, contact us at:
// - Website: infinilabs.com
// - Email: [email protected]
//
// Open Source licensed under AGPL V3:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package errors

import (
"fmt"
"infini.sh/framework/core/errors"
)

const (
ErrTypeRequestParams = "request_params_error"
ErrTypeApplication = "application_error"
ErrTypeAlreadyExists = "already_exists_error"
ErrTypeNotExists = "not_exists_error"
ErrTypeIncorrectPassword = "incorrect_password_error"
ErrTypeDomainPrefixMismatch = "domain_prefix_mismatch_error"
ErrTypeDisabled = "disabled_error"
ErrTypeRequestTimeout = "request_timeout_error"
)

var (
ErrPasswordIncorrect = errors.New("incorrect password")
ErrNotExistsErr = errors.New("not exists")
)

type Error struct {
typ string
msg interface{}
field string
}

func (err Error) Error() string {
return fmt.Sprintf("%s:%v: %v", err.typ, err.field, err.msg)
}

//NewAppError returns an application error
func NewAppError(msg any) *Error {
return New(ErrTypeApplication, "", msg)
}

//NewParamsError returns a request params error
func NewParamsError(field string, msg any) *Error {
return New(ErrTypeRequestParams, field, msg)
}

//NewAlreadyExistsError returns an already exists error
func NewAlreadyExistsError(field string, msg any) *Error {
return New(ErrTypeAlreadyExists, field, msg)
}

//NewNotExistsError returns a not exists error
func NewNotExistsError(field string, msg any) *Error {
return New(ErrTypeNotExists, field, msg)
}

func New(typ string, field string, msg any) *Error {
return &Error{
typ,
msg,
field,
}
}
13 changes: 12 additions & 1 deletion modules/elastic/api/cluster_overview.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ package api

import (
"context"
"errors"
"fmt"
cerr "infini.sh/console/core/errors"
"infini.sh/framework/modules/elastic/adapter"
"net/http"
"strings"
Expand Down Expand Up @@ -263,7 +265,16 @@ func (h *APIHandler) FetchClusterInfo(w http.ResponseWriter, req *http.Request,
}
ctx, cancel := context.WithTimeout(context.Background(), du)
defer cancel()
indexMetrics := h.getMetrics(ctx, query, indexMetricItems, bucketSize)
indexMetrics, err := h.getMetrics(ctx, query, indexMetricItems, bucketSize)
if err != nil {
log.Error(err)
if errors.Is(err, context.DeadlineExceeded) {
h.WriteError(w, cerr.New(cerr.ErrTypeRequestTimeout, "", err).Error(), http.StatusRequestTimeout)
return
}
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
indexingMetricData := util.MapStr{}
for _, line := range indexMetrics["cluster_indexing"].Lines {
// remove first metric dot
Expand Down
44 changes: 32 additions & 12 deletions modules/elastic/api/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,12 @@ func (h *APIHandler) FetchHostInfo(w http.ResponseWriter, req *http.Request, ps
Units: "/s",
},
}
hostMetrics := h.getGroupHostMetric(context.Background(), agentIDs, min, max, bucketSize, hostMetricItems, "agent.id")
hostMetrics, err := h.getGroupHostMetric(context.Background(), agentIDs, min, max, bucketSize, hostMetricItems, "agent.id")
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}

networkMetrics := map[string]util.MapStr{}
for key, item := range hostMetrics {
Expand Down Expand Up @@ -572,7 +577,7 @@ func (h *APIHandler) GetHostInfo(w http.ResponseWriter, req *http.Request, ps ht

}

func (h *APIHandler) getSingleHostMetric(ctx context.Context, agentID string, min, max int64, bucketSize int, metricItems []*common.MetricItem) map[string]*common.MetricItem {
func (h *APIHandler) getSingleHostMetric(ctx context.Context, agentID string, min, max int64, bucketSize int, metricItems []*common.MetricItem) (map[string]*common.MetricItem, error) {
var must = []util.MapStr{
{
"term": util.MapStr{
Expand Down Expand Up @@ -608,7 +613,7 @@ func (h *APIHandler) getSingleHostMetric(ctx context.Context, agentID string, mi
return h.getSingleMetrics(ctx, metricItems, query, bucketSize)
}

func (h *APIHandler) getSingleHostMetricFromNode(ctx context.Context, nodeID string, min, max int64, bucketSize int, metricKey string) map[string]*common.MetricItem {
func (h *APIHandler) getSingleHostMetricFromNode(ctx context.Context, nodeID string, min, max int64, bucketSize int, metricKey string) (map[string]*common.MetricItem, error) {
var must = []util.MapStr{
{
"term": util.MapStr{
Expand Down Expand Up @@ -725,7 +730,12 @@ func (h *APIHandler) GetSingleHostMetrics(w http.ResponseWriter, req *http.Reque
ctx, cancel := context.WithTimeout(context.Background(), du)
defer cancel()
if hostInfo.AgentID == "" {
resBody["metrics"] = h.getSingleHostMetricFromNode(ctx, hostInfo.NodeID, min, max, bucketSize, key)
resBody["metrics"], err = h.getSingleHostMetricFromNode(ctx, hostInfo.NodeID, min, max, bucketSize, key)
if err != nil {
log.Error(err)
h.WriteError(w, err, http.StatusInternalServerError)
return
}
h.WriteJSON(w, resBody, http.StatusOK)
return
}
Expand Down Expand Up @@ -788,20 +798,30 @@ func (h *APIHandler) GetSingleHostMetrics(w http.ResponseWriter, req *http.Reque
metricItem.AddLine("Disk Write Rate", "Disk Write Rate", "network write rate of host.", "group1", "payload.host.diskio_summary.write.bytes", "max", bucketSizeStr, "%", "bytes", "0,0.[00]", "0,0.[00]", false, true)
metricItems = append(metricItems, metricItem)
case DiskPartitionUsageMetricKey, NetworkInterfaceOutputRateMetricKey:
groupMetrics := h.getGroupHostMetrics(ctx, hostInfo.AgentID, min, max, bucketSize, key)
resBody["metrics"] = groupMetrics
resBody["metrics"] , err = h.getGroupHostMetrics(ctx, hostInfo.AgentID, min, max, bucketSize, key)
if err != nil {
log.Error(err)
h.WriteError(w, err, http.StatusInternalServerError)
return
}
h.WriteJSON(w, resBody, http.StatusOK)
return
}
hostMetrics := h.getSingleHostMetric(ctx, hostInfo.AgentID, min, max, bucketSize, metricItems)
hostMetrics, err := h.getSingleHostMetric(ctx, hostInfo.AgentID, min, max, bucketSize, metricItems)
if err != nil {
log.Error(err)
h.WriteError(w, err, http.StatusInternalServerError)
return
}

resBody["metrics"] = hostMetrics

h.WriteJSON(w, resBody, http.StatusOK)
}

func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, min, max int64, bucketSize int, metricKey string) map[string]*common.MetricItem {
func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, min, max int64, bucketSize int, metricKey string) (map[string]*common.MetricItem, error) {
var metrics = make(map[string]*common.MetricItem)
var err error
switch metricKey {
case DiskPartitionUsageMetricKey:
diskPartitionMetric := newMetricItem(DiskPartitionUsageMetricKey, 2, SystemGroupKey)
Expand All @@ -817,7 +837,7 @@ func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, mi
Units: "%",
},
}
metrics = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.disk_partition_usage.partition")
metrics, err = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.disk_partition_usage.partition")
case NetworkInterfaceOutputRateMetricKey:
networkOutputMetric := newMetricItem(NetworkInterfaceOutputRateMetricKey, 2, SystemGroupKey)
networkOutputMetric.AddAxi("Network interface output rate", "group1", common.PositionLeft, "bytes", "0.[0]", "0.[0]", 5, true)
Expand All @@ -832,13 +852,13 @@ func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, mi
Units: "",
},
}
metrics = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.network_interface.name")
metrics, err = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.network_interface.name")
}

return metrics
return metrics, err
}

func (h *APIHandler) getGroupHostMetric(ctx context.Context, agentIDs []string, min, max int64, bucketSize int, hostMetricItems []GroupMetricItem, groupField string) map[string]*common.MetricItem {
func (h *APIHandler) getGroupHostMetric(ctx context.Context, agentIDs []string, min, max int64, bucketSize int, hostMetricItems []GroupMetricItem, groupField string) (map[string]*common.MetricItem, error) {
var must = []util.MapStr{
{
"term": util.MapStr{
Expand Down
2 changes: 1 addition & 1 deletion modules/elastic/api/index_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu
},
},
}
return h.getMetrics(ctx, query, indexMetricItems, bucketSize), nil
return h.getMetrics(ctx, query, indexMetricItems, bucketSize)

}

Expand Down
14 changes: 12 additions & 2 deletions modules/elastic/api/index_overview.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,11 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p
},
},
}
metrics := h.getMetrics(ctx, query, nodeMetricItems, bucketSize)
metrics, err := h.getMetrics(ctx, query, nodeMetricItems, bucketSize)
if err != nil {
log.Error(err)
h.WriteError(w, err, http.StatusInternalServerError)
}
indexMetrics := map[string]util.MapStr{}
for key, item := range metrics {
for _, line := range item.Lines {
Expand Down Expand Up @@ -921,6 +925,8 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ
healthMetric, err := h.GetIndexHealthMetric(ctx, clusterID, indexName, min, max, bucketSize)
if err != nil {
log.Error(err)
h.WriteError(w, err, http.StatusInternalServerError)
return
}
metrics["index_health"] = healthMetric
} else {
Expand Down Expand Up @@ -987,7 +993,11 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ
}
metricItems = append(metricItems, metricItem)
}
metrics = h.getSingleIndexMetrics(context.Background(), metricItems, query, bucketSize)
metrics, err = h.getSingleIndexMetrics(context.Background(), metricItems, query, bucketSize)
if err != nil {
log.Error(err)
h.WriteError(w, err, http.StatusInternalServerError)
}
}

resBody["metrics"] = metrics
Expand Down
Loading

0 comments on commit c6af84c

Please sign in to comment.