Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support querying top N metrics in the Insight Data Query API #74

Merged
merged 13 commits into from
Jan 10, 2025
583 changes: 579 additions & 4 deletions config/setup/common/insight.tpl

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion docs/content.en/docs/release-notes/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ Information about release notes of INFINI Console is provided here.
### Features
- Add allocation to activities if is cluster health change and changed to red.
- Add index metrics for segment memory (norms, points, version map, fixed bit set).

- Support querying top N metrics in the Insight Data Query API
- Add insight metric CURD API for managing custom metrics
- Add built-in metrics templates for common use cases
### Bug fix
- Fixed query thread pool metrics when cluster uuid is empty
- Fixed unit tests
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func main() {
orm.RegisterSchemaWithIndexName(api3.RemoteConfig{}, "configs")
orm.RegisterSchemaWithIndexName(model.AuditLog{}, "audit-logs")
orm.RegisterSchemaWithIndexName(host.HostInfo{}, "host")
orm.RegisterSchemaWithIndexName(insight.MetricBase{}, "metric")

module.Start()

Expand Down
33 changes: 27 additions & 6 deletions model/insight/metric_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package insight

import (
"fmt"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"regexp"
)
Expand All @@ -43,11 +44,35 @@ type Metric struct {
Sort []GroupSort `json:"sort,omitempty"`
ClusterId string `json:"cluster_id,omitempty"`
Formula string `json:"formula,omitempty"`
//array of formula for new version
Formulas []string `json:"formulas,omitempty"`
Items []MetricItem `json:"items"`
FormatType string `json:"format_type,omitempty"`
FormatType string `json:"format,omitempty"`
TimeFilter interface{} `json:"time_filter,omitempty"`
TimeBeforeGroup bool `json:"time_before_group,omitempty"`
BucketLabel *BucketLabel `json:"bucket_label,omitempty"`
// number of buckets to return, used for aggregation auto_date_histogram when bucket size equals 'auto'
Buckets uint `json:"buckets,omitempty"`
Unit string `json:"unit,omitempty"`
}

type MetricBase struct {
orm.ORMObjectBase
//display name of the metric
Name string `json:"name"`
//metric identifier
Key string `json:"key"`
//optional values : "node", "indices", "shard"
Level string `json:"level"`
//metric calculation formula
Formula string `json:"formula,omitempty"`
Items []MetricItem `json:"items"`
FormatType string `json:"format,omitempty"`
Unit string `json:"unit,omitempty"`
//determine if this metric is built-in
Builtin bool `json:"builtin"`
//array of supported calculation statistic, eg: "avg", "sum", "min", "max"
Statistics []string `json:"statistics,omitempty"`
}

type GroupSort struct {
Expand Down Expand Up @@ -105,12 +130,8 @@ func (m *Metric) ValidateSortKey() error {
if !util.StringInArray([]string{"desc", "asc"}, sortItem.Direction){
return fmt.Errorf("unknown sort direction [%s]", sortItem.Direction)
}
if v, ok := mm[sortItem.Key]; !ok && !util.StringInArray([]string{"_key", "_count"}, sortItem.Key){
if _, ok := mm[sortItem.Key]; !ok && !util.StringInArray([]string{"_key", "_count"}, sortItem.Key){
return fmt.Errorf("unknown sort key [%s]", sortItem.Key)
}else{
if v != nil && v.Statistic == "derivative" {
return fmt.Errorf("can not sort by pipeline agg [%s]", v.Statistic)
}
}
}
return nil
Expand Down
3 changes: 3 additions & 0 deletions plugin/api/insight/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,7 @@ func InitAPI() {
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/map_label/_render", insight.renderMapLabelTemplate)
api.HandleAPIMethod(api.GET, "/insight/widget/:widget_id", insight.getWidget)
api.HandleAPIMethod(api.POST, "/insight/widget", insight.RequireLogin(insight.createWidget))
api.HandleAPIMethod(api.POST, "/insight/metric", insight.createMetric)
api.HandleAPIMethod(api.PUT, "/insight/metric/:metric_id", insight.updateMetric)
api.HandleAPIMethod(api.DELETE, "/insight/metric/:metric_id", insight.deleteMetric)
}
137 changes: 79 additions & 58 deletions plugin/api/insight/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ func getMetricData(metric *insight.Metric) (interface{}, error) {
return nil, err
}
esClient := elastic.GetClient(metric.ClusterId)
//log.Error(string(util.MustToJSONBytes(query)))
searchRes, err := esClient.SearchWithRawQueryDSL(metric.IndexPattern, util.MustToJSONBytes(query))
queryDSL := util.MustToJSONBytes(query)
searchRes, err := esClient.SearchWithRawQueryDSL(metric.IndexPattern, queryDSL)
if err != nil {
return nil, err
}
Expand All @@ -266,83 +266,101 @@ func getMetricData(metric *insight.Metric) (interface{}, error) {
}
}
timeBeforeGroup := metric.AutoTimeBeforeGroup()
metricData := CollectMetricData(agg, timeBeforeGroup)
metricData, interval := CollectMetricData(agg, timeBeforeGroup)
formula := strings.TrimSpace(metric.Formula)
//support older versions for a single formula.
if metric.Formula != "" && len(metric.Formulas) == 0 {
metric.Formulas = []string{metric.Formula}
}

var targetMetricData []insight.MetricData
formula := strings.TrimSpace(metric.Formula)
if len(metric.Items) == 1 && formula == "" {
if len(metric.Items) == 1 && len(metric.Formulas) == 0 {
targetMetricData = metricData
} else {
tpl, err := template.New("insight_formula").Parse(formula)
if err != nil {
return nil, err
}
msgBuffer := &bytes.Buffer{}
params := map[string]interface{}{}
if metric.BucketSize != "" {
du, err := util.ParseDuration(metric.BucketSize)
if err != nil {
return nil, err
bucketSize := metric.BucketSize
if metric.BucketSize == "auto" && interval != "" {
bucketSize = interval
}
if interval != "" || bucketSize != "auto" {
du, err := util.ParseDuration(bucketSize)
if err != nil {
return nil, err
}
params["bucket_size_in_second"] = du.Seconds()
}
params["bucket_size_in_second"] = du.Seconds()
}
err = tpl.Execute(msgBuffer, params)
if err != nil {
return nil, err
}
formula = msgBuffer.String()
for _, md := range metricData {
targetData := insight.MetricData{
Groups: md.Groups,
Data: map[string][]insight.MetricDataItem{},
}
expression, err := govaluate.NewEvaluableExpression(formula)
if err != nil {
return nil, err
}
dataLength := 0
for _, v := range md.Data {
dataLength = len(v)
break
}
DataLoop:
for i := 0; i < dataLength; i++ {
parameters := map[string]interface{}{}
var timestamp interface{}
hasValidData := false
for k, v := range md.Data {
if len(k) == 20 {
continue
}
if len(v) < dataLength {
continue
}
if _, ok := v[i].Value.(float64); !ok {
continue DataLoop
}
hasValidData = true
parameters[k] = v[i].Value
timestamp = v[i].Timestamp
retMetricDataItem := insight.MetricDataItem{}
for _, formula = range metric.Formulas {
tpl, err := template.New("insight_formula").Parse(formula)
if err != nil {
return nil, err
}
//todo return error?
if !hasValidData {
continue
msgBuffer := &bytes.Buffer{}
err = tpl.Execute(msgBuffer, params)
if err != nil {
return nil, err
}
result, err := expression.Evaluate(parameters)
resolvedFormula := msgBuffer.String()
expression, err := govaluate.NewEvaluableExpression(resolvedFormula)
if err != nil {
return nil, err
}
if r, ok := result.(float64); ok {
if math.IsNaN(r) || math.IsInf(r, 0) {
//if !isFilterNaN {
// targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, math.NaN()})
//}
dataLength := 0
for _, v := range md.Data {
dataLength = len(v)
break
}
DataLoop:
for i := 0; i < dataLength; i++ {
parameters := map[string]interface{}{}
var timestamp interface{}
hasValidData := false
for k, v := range md.Data {
if _, ok := v[i].Value.(float64); !ok {
continue DataLoop
}
hasValidData = true
parameters[k] = v[i].Value
timestamp = v[i].Timestamp
}
//todo return error?
if !hasValidData {
continue
}
result, err := expression.Evaluate(parameters)
if err != nil {
log.Debugf("evaluate formula error: %v", err)
continue
}
if r, ok := result.(float64); ok {
if math.IsNaN(r) || math.IsInf(r, 0) {
//if !isFilterNaN {
// targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, math.NaN()})
//}
continue
}
}
retMetricDataItem.Timestamp = timestamp
if len(metric.Formulas) <= 1 && metric.Formula != ""{
//support older versions by returning the result for a single formula.
retMetricDataItem.Value = result
} else {
if v, ok := retMetricDataItem.Value.(map[string]interface{}); ok {
v[formula] = result
}else{
retMetricDataItem.Value = map[string]interface{}{formula: result}
}
}
}

targetData.Data["result"] = append(targetData.Data["result"], insight.MetricDataItem{Timestamp: timestamp, Value: result})
}
targetData.Data["result"] = append(targetData.Data["result"], retMetricDataItem)
targetMetricData = append(targetMetricData, targetData)
}
}
Expand All @@ -356,7 +374,10 @@ func getMetricData(metric *insight.Metric) (interface{}, error) {
}
}
}
return result, nil
return util.MapStr{
"data": result,
"request": string(queryDSL),
}, nil
}

func getMetadataByIndexPattern(clusterID, indexPattern, timeField string, filter interface{}, fieldsFormat map[string]string) (interface{}, error) {
Expand Down
Loading
Loading