Skip to content

Commit

Permalink
feat: adding context param to control metric request timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
silenceqi committed Dec 6, 2024
1 parent dceceae commit 592a108
Show file tree
Hide file tree
Showing 16 changed files with 152 additions and 82 deletions.
3 changes: 2 additions & 1 deletion modules/elastic/api/cluster_overview.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package api

import (
"context"
"fmt"
"infini.sh/framework/modules/elastic/adapter"
"net/http"
Expand Down Expand Up @@ -254,7 +255,7 @@ func (h *APIHandler) FetchClusterInfo(w http.ResponseWriter, req *http.Request,
},
},
}
indexMetrics := h.getMetrics(query, indexMetricItems, bucketSize)
indexMetrics := h.getMetrics(context.Background(), query, indexMetricItems, bucketSize)
indexingMetricData := util.MapStr{}
for _, line := range indexMetrics["cluster_indexing"].Lines {
// remove first metric dot
Expand Down
11 changes: 6 additions & 5 deletions modules/elastic/api/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package api

import (
"context"
"fmt"
log "github.com/cihub/seelog"
httprouter "infini.sh/framework/core/api/router"
Expand Down Expand Up @@ -604,10 +605,10 @@ func (h *APIHandler) getSingleHostMetric(agentID string, min, max int64, bucketS
},
},
}
return h.getSingleMetrics(metricItems, query, bucketSize)
return h.getSingleMetrics(context.Background(), metricItems, query, bucketSize)
}

func (h *APIHandler) getSingleHostMetricFromNode(nodeID string, min, max int64, bucketSize int) map[string]*common.MetricItem {
func (h *APIHandler) getSingleHostMetricFromNode(ctx context.Context, nodeID string, min, max int64, bucketSize int) map[string]*common.MetricItem {
var must = []util.MapStr{
{
"term": util.MapStr{
Expand Down Expand Up @@ -669,7 +670,7 @@ func (h *APIHandler) getSingleHostMetricFromNode(nodeID string, min, max int64,
return 100 - value*100/value2
}
metricItems = append(metricItems, metricItem)
return h.getSingleMetrics(metricItems, query, bucketSize)
return h.getSingleMetrics(ctx, metricItems, query, bucketSize)
}

func (h *APIHandler) GetSingleHostMetrics(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
Expand All @@ -696,7 +697,7 @@ func (h *APIHandler) GetSingleHostMetrics(w http.ResponseWriter, req *http.Reque
return
}
if hostInfo.AgentID == "" {
resBody["metrics"] = h.getSingleHostMetricFromNode(hostInfo.NodeID, min, max, bucketSize)
resBody["metrics"] = h.getSingleHostMetricFromNode(context.Background(), hostInfo.NodeID, min, max, bucketSize)
h.WriteJSON(w, resBody, http.StatusOK)
return
}
Expand Down Expand Up @@ -866,7 +867,7 @@ func (h *APIHandler) getGroupHostMetric(agentIDs []string, min, max int64, bucke
},
},
}
return h.getMetrics(query, hostMetricItems, bucketSize)
return h.getMetrics(context.Background(), query, hostMetricItems, bucketSize)
}

func getHost(hostID string) (*host.HostInfo, error) {
Expand Down
5 changes: 3 additions & 2 deletions modules/elastic/api/index_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package api

import (
"context"
"fmt"
log "github.com/cihub/seelog"
v1 "infini.sh/console/modules/elastic/api/v1"
Expand All @@ -39,7 +40,7 @@ import (
"time"
)

func (h *APIHandler) getIndexMetrics(req *http.Request, clusterID string, bucketSize int, min, max int64, indexName string, top int, shardID string, metricKey string) (map[string]*common.MetricItem, error){
func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clusterID string, bucketSize int, min, max int64, indexName string, top int, shardID string, metricKey string) (map[string]*common.MetricItem, error){
bucketSizeStr:=fmt.Sprintf("%vs",bucketSize)
clusterUUID, err := adapter.GetClusterUUID(clusterID)
if err != nil {
Expand Down Expand Up @@ -741,7 +742,7 @@ func (h *APIHandler) getIndexMetrics(req *http.Request, clusterID string, bucket
},
},
}
return h.getMetrics(query, indexMetricItems, bucketSize), nil
return h.getMetrics(ctx, query, indexMetricItems, bucketSize), nil

}

Expand Down
5 changes: 3 additions & 2 deletions modules/elastic/api/index_overview.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package api

import (
"context"
"fmt"
log "github.com/cihub/seelog"
httprouter "infini.sh/framework/core/api/router"
Expand Down Expand Up @@ -503,7 +504,7 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p
},
},
}
metrics := h.getMetrics(query, nodeMetricItems, bucketSize)
metrics := h.getMetrics(context.Background(), query, nodeMetricItems, bucketSize)
indexMetrics := map[string]util.MapStr{}
for key, item := range metrics {
for _, line := range item.Lines {
Expand Down Expand Up @@ -929,7 +930,7 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ
return value/value2
}
metricItems=append(metricItems,metricItem)
metrics := h.getSingleIndexMetrics(metricItems,query, bucketSize)
metrics := h.getSingleIndexMetrics(context.Background(), metricItems,query, bucketSize)
shardStateMetric, err := h.getIndexShardsMetric(clusterID, indexName, min, max, bucketSize)
if err != nil {
log.Error(err)
Expand Down
84 changes: 61 additions & 23 deletions modules/elastic/api/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,10 +551,19 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http
if bucketSize <= 60 {
min = min - int64(2*bucketSize*1000)
}
timeout := h.GetParameterOrDefault(req, "timeout", "60s")
du, err := time.ParseDuration(timeout)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
ctx, cancel := context.WithTimeout(context.Background(), du)
defer cancel()
if util.StringInArray([]string{IndexThroughputMetricKey, SearchThroughputMetricKey, IndexLatencyMetricKey, SearchLatencyMetricKey}, key) {
metrics = h.GetClusterIndexMetrics(id, bucketSize, min, max, key)
metrics = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, key)
}else{
metrics = h.GetClusterMetrics(id, bucketSize, min, max, key)
metrics = h.GetClusterMetrics(ctx, id, bucketSize, min, max, key)
}

resBody["metrics"] = metrics
Expand Down Expand Up @@ -582,7 +591,16 @@ func (h *APIHandler) HandleNodeMetricsAction(w http.ResponseWriter, req *http.Re
min = min - int64(2*bucketSize*1000)
}
key := h.GetParameter(req, "key")
resBody["metrics"], err = h.getNodeMetrics(id, bucketSize, min, max, nodeName, top, key)
timeout := h.GetParameterOrDefault(req, "timeout", "60s")
du, err := time.ParseDuration(timeout)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
ctx, cancel := context.WithTimeout(context.Background(), du)
defer cancel()
resBody["metrics"], err = h.getNodeMetrics(ctx, id, bucketSize, min, max, nodeName, top, key)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
Expand Down Expand Up @@ -626,15 +644,24 @@ func (h *APIHandler) HandleIndexMetricsAction(w http.ResponseWriter, req *http.R
min = min - int64(2*bucketSize*1000)
}
key := h.GetParameter(req, "key")
timeout := h.GetParameterOrDefault(req, "timeout", "60s")
du, err := time.ParseDuration(timeout)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
ctx, cancel := context.WithTimeout(context.Background(), du)
defer cancel()
var metrics map[string]*common.MetricItem
if key == v1.DocPercentMetricKey {
metrics, err = h.getIndexMetrics(req, id, bucketSize, min, max, indexName, top, shardID, v1.DocCountMetricKey)
metrics, err = h.getIndexMetrics(ctx, req, id, bucketSize, min, max, indexName, top, shardID, v1.DocCountMetricKey)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
docsDeletedMetrics, err := h.getIndexMetrics(req, id, bucketSize, min, max, indexName, top, shardID, v1.DocsDeletedMetricKey)
docsDeletedMetrics, err := h.getIndexMetrics(ctx, req, id, bucketSize, min, max, indexName, top, shardID, v1.DocsDeletedMetricKey)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
Expand Down Expand Up @@ -691,7 +718,7 @@ func (h *APIHandler) HandleIndexMetricsAction(w http.ResponseWriter, req *http.R

}
}else{
metrics, err = h.getIndexMetrics(req, id, bucketSize, min, max, indexName, top, shardID, key)
metrics, err = h.getIndexMetrics(ctx, req, id, bucketSize, min, max, indexName, top, shardID, key)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
Expand Down Expand Up @@ -731,7 +758,16 @@ func (h *APIHandler) HandleQueueMetricsAction(w http.ResponseWriter, req *http.R
min = min - int64(2*bucketSize*1000)
}
key := h.GetParameter(req, "key")
resBody["metrics"], err = h.getThreadPoolMetrics(id, bucketSize, min, max, nodeName, top, key)
timeout := h.GetParameterOrDefault(req, "timeout", "60s")
du, err := time.ParseDuration(timeout)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
ctx, cancel := context.WithTimeout(context.Background(), du)
defer cancel()
resBody["metrics"], err = h.getThreadPoolMetrics(ctx, id, bucketSize, min, max, nodeName, top, key)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
Expand Down Expand Up @@ -867,35 +903,35 @@ const (
CircuitBreakerMetricKey = "circuit_breaker"
)

func (h *APIHandler) GetClusterMetrics(id string, bucketSize int, min, max int64, metricKey string) map[string]*common.MetricItem {
func (h *APIHandler) GetClusterMetrics(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) map[string]*common.MetricItem {

var clusterMetricsResult = map[string]*common.MetricItem {}
switch metricKey {
case ClusterDocumentsMetricKey,
ClusterStorageMetricKey,
ClusterIndicesMetricKey,
ClusterNodeCountMetricKey:
clusterMetricsResult = h.getClusterMetricsByKey(id, bucketSize, min, max, metricKey)
clusterMetricsResult = h.getClusterMetricsByKey(ctx, id, bucketSize, min, max, metricKey)
case IndexLatencyMetricKey, IndexThroughputMetricKey, SearchThroughputMetricKey, SearchLatencyMetricKey:
clusterMetricsResult = h.GetClusterIndexMetrics(id, bucketSize, min, max, metricKey)
clusterMetricsResult = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, metricKey)
case ClusterHealthMetricKey:
statusMetric, err := h.getClusterStatusMetric(id, min, max, bucketSize)
statusMetric, err := h.getClusterStatusMetric(ctx, id, min, max, bucketSize)
if err == nil {
clusterMetricsResult[ClusterHealthMetricKey] = statusMetric
} else {
log.Error("get cluster status metric error: ", err)
}
case ShardCountMetricKey:
clusterMetricsResult = h.getShardsMetric(id, min, max, bucketSize)
clusterMetricsResult = h.getShardsMetric(ctx, id, min, max, bucketSize)

case CircuitBreakerMetricKey:
clusterMetricsResult = h.getCircuitBreakerMetric(id, min, max, bucketSize)
clusterMetricsResult = h.getCircuitBreakerMetric(ctx, id, min, max, bucketSize)
}

return clusterMetricsResult
}

func (h *APIHandler) getClusterMetricsByKey(id string, bucketSize int, min, max int64, metricKey string) map[string]*common.MetricItem {
func (h *APIHandler) getClusterMetricsByKey(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) map[string]*common.MetricItem {
bucketSizeStr := fmt.Sprintf("%vs", bucketSize)

clusterMetricItems := []*common.MetricItem{}
Expand Down Expand Up @@ -985,7 +1021,7 @@ func (h *APIHandler) getClusterMetricsByKey(id string, bucketSize int, min, max
},
},
}
return h.getSingleMetrics(clusterMetricItems, query, bucketSize)
return h.getSingleMetrics(ctx, clusterMetricItems, query, bucketSize)
}

const (
Expand All @@ -995,7 +1031,7 @@ const (
SearchLatencyMetricKey = "search_latency"
)

func (h *APIHandler) GetClusterIndexMetrics(id string, bucketSize int, min, max int64, metricKey string) map[string]*common.MetricItem {
func (h *APIHandler) GetClusterIndexMetrics(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) map[string]*common.MetricItem {
bucketSizeStr := fmt.Sprintf("%vs", bucketSize)
metricItems := []*common.MetricItem{}
switch metricKey {
Expand Down Expand Up @@ -1091,10 +1127,10 @@ func (h *APIHandler) GetClusterIndexMetrics(id string, bucketSize int, min, max
},
},
}
return h.getSingleIndexMetricsByNodeStats(metricItems, query, bucketSize)
return h.getSingleIndexMetricsByNodeStats(ctx, metricItems, query, bucketSize)
}

func (h *APIHandler) getShardsMetric(id string, min, max int64, bucketSize int) map[string]*common.MetricItem {
func (h *APIHandler) getShardsMetric(ctx context.Context, id string, min, max int64, bucketSize int) map[string]*common.MetricItem {
bucketSizeStr := fmt.Sprintf("%vs", bucketSize)
query := util.MapStr{
"query": util.MapStr{
Expand Down Expand Up @@ -1153,10 +1189,10 @@ func (h *APIHandler) getShardsMetric(id string, min, max int64, bucketSize int)
metricItem.AddLine("Delayed Unassigned Shards", "Delayed Unassigned Shards", "", "group1", "payload.elasticsearch.cluster_health.delayed_unassigned_shards", "max", bucketSizeStr, "", "num", "0,0.[00]", "0,0.[00]", false, false)
var clusterHealthMetrics []*common.MetricItem
clusterHealthMetrics = append(clusterHealthMetrics, metricItem)
return h.getSingleMetrics(clusterHealthMetrics, query, bucketSize)
return h.getSingleMetrics(ctx, clusterHealthMetrics, query, bucketSize)
}

func (h *APIHandler) getCircuitBreakerMetric(id string, min, max int64, bucketSize int) map[string]*common.MetricItem {
func (h *APIHandler) getCircuitBreakerMetric(ctx context.Context, id string, min, max int64, bucketSize int) map[string]*common.MetricItem {
bucketSizeStr := fmt.Sprintf("%vs", bucketSize)
query := util.MapStr{
"query": util.MapStr{
Expand Down Expand Up @@ -1214,10 +1250,10 @@ func (h *APIHandler) getCircuitBreakerMetric(id string, min, max int64, bucketSi
metricItem.AddLine("In Flight Requests Breaker Tripped", "In Flight Requests Tripped", "", "group1", "payload.elasticsearch.node_stats.breakers.in_flight_requests.tripped", "sum", bucketSizeStr, "times/s", "num", "0,0.[00]", "0,0.[00]", false, true)
var circuitBreakerMetrics []*common.MetricItem
circuitBreakerMetrics = append(circuitBreakerMetrics, metricItem)
return h.getSingleMetrics(circuitBreakerMetrics, query, bucketSize)
return h.getSingleMetrics(ctx, circuitBreakerMetrics, query, bucketSize)
}

func (h *APIHandler) getClusterStatusMetric(id string, min, max int64, bucketSize int) (*common.MetricItem, error) {
func (h *APIHandler) getClusterStatusMetric(ctx context.Context, id string, min, max int64, bucketSize int) (*common.MetricItem, error) {
bucketSizeStr := fmt.Sprintf("%vs", bucketSize)
intervalField, err := getDateHistogramIntervalField(global.MustLookupString(elastic.GlobalSystemElasticsearchID), bucketSizeStr)
if err != nil {
Expand Down Expand Up @@ -1278,7 +1314,8 @@ func (h *APIHandler) getClusterStatusMetric(id string, min, max int64, bucketSiz
},
},
}
response, err := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).SearchWithRawQueryDSL(getAllMetricsIndex(), util.MustToJSONBytes(query))
queryDSL := util.MustToJSONBytes(query)
response, err := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).QueryDSL(ctx, getAllMetricsIndex(), nil, queryDSL)
if err != nil {
log.Error(err)
return nil, err
Expand All @@ -1295,6 +1332,7 @@ func (h *APIHandler) getClusterStatusMetric(id string, min, max int64, bucketSiz
}
metricItem.Lines[0].Data = metricData
metricItem.Lines[0].Type = common.GraphTypeBar
metricItem.Request = string(queryDSL)
return metricItem, nil
}

Expand Down
Loading

0 comments on commit 592a108

Please sign in to comment.