Skip to content

Commit

Permalink
feat: support querying top N metrics in the Insight Data Query API (#74)
Browse files Browse the repository at this point in the history
* feat: refactor the Insight data API to support metric aggregation for multiple data inputs

* fix:  bucket sorting issue when a sub-aggregation contains a `date_histogram`

* feat: add insight metric CRUD api

* fix: empty value of metric

* fix: incorrect grouping results

* feat: add builtin metric template

* fix: bucket sort when aggregation with P99

* chore: update group size

* chore: update release notes

* chore: update release notes

* chore: update release notes

* chore: update group size
  • Loading branch information
silenceqi authored Jan 10, 2025
1 parent f8eb0c2 commit 692b87e
Show file tree
Hide file tree
Showing 9 changed files with 960 additions and 115 deletions.
583 changes: 579 additions & 4 deletions config/setup/common/insight.tpl

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion docs/content.en/docs/release-notes/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ Information about release notes of INFINI Console is provided here.
### Features
- Add allocation to activities if is cluster health change and changed to red.
- Add index metrics for segment memory (norms, points, version map, fixed bit set).

- Support querying top N metrics in the Insight Data Query API
- Add insight metric CURD API for managing custom metrics
- Add built-in metrics templates for common use cases
### Bug fix
- Fixed query thread pool metrics when cluster uuid is empty
- Fixed unit tests
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func main() {
orm.RegisterSchemaWithIndexName(api3.RemoteConfig{}, "configs")
orm.RegisterSchemaWithIndexName(model.AuditLog{}, "audit-logs")
orm.RegisterSchemaWithIndexName(host.HostInfo{}, "host")
orm.RegisterSchemaWithIndexName(insight.MetricBase{}, "metric")

module.Start()

Expand Down
33 changes: 27 additions & 6 deletions model/insight/metric_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package insight

import (
"fmt"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"regexp"
)
Expand All @@ -43,11 +44,35 @@ type Metric struct {
Sort []GroupSort `json:"sort,omitempty"`
ClusterId string `json:"cluster_id,omitempty"`
Formula string `json:"formula,omitempty"`
//array of formula for new version
Formulas []string `json:"formulas,omitempty"`
Items []MetricItem `json:"items"`
FormatType string `json:"format_type,omitempty"`
FormatType string `json:"format,omitempty"`
TimeFilter interface{} `json:"time_filter,omitempty"`
TimeBeforeGroup bool `json:"time_before_group,omitempty"`
BucketLabel *BucketLabel `json:"bucket_label,omitempty"`
// number of buckets to return, used for aggregation auto_date_histogram when bucket size equals 'auto'
Buckets uint `json:"buckets,omitempty"`
Unit string `json:"unit,omitempty"`
}

type MetricBase struct {
orm.ORMObjectBase
//display name of the metric
Name string `json:"name"`
//metric identifier
Key string `json:"key"`
//optional values : "node", "indices", "shard"
Level string `json:"level"`
//metric calculation formula
Formula string `json:"formula,omitempty"`
Items []MetricItem `json:"items"`
FormatType string `json:"format,omitempty"`
Unit string `json:"unit,omitempty"`
//determine if this metric is built-in
Builtin bool `json:"builtin"`
//array of supported calculation statistic, eg: "avg", "sum", "min", "max"
Statistics []string `json:"statistics,omitempty"`
}

type GroupSort struct {
Expand Down Expand Up @@ -105,12 +130,8 @@ func (m *Metric) ValidateSortKey() error {
if !util.StringInArray([]string{"desc", "asc"}, sortItem.Direction){
return fmt.Errorf("unknown sort direction [%s]", sortItem.Direction)
}
if v, ok := mm[sortItem.Key]; !ok && !util.StringInArray([]string{"_key", "_count"}, sortItem.Key){
if _, ok := mm[sortItem.Key]; !ok && !util.StringInArray([]string{"_key", "_count"}, sortItem.Key){
return fmt.Errorf("unknown sort key [%s]", sortItem.Key)
}else{
if v != nil && v.Statistic == "derivative" {
return fmt.Errorf("can not sort by pipeline agg [%s]", v.Statistic)
}
}
}
return nil
Expand Down
3 changes: 3 additions & 0 deletions plugin/api/insight/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,7 @@ func InitAPI() {
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/map_label/_render", insight.renderMapLabelTemplate)
api.HandleAPIMethod(api.GET, "/insight/widget/:widget_id", insight.getWidget)
api.HandleAPIMethod(api.POST, "/insight/widget", insight.RequireLogin(insight.createWidget))
api.HandleAPIMethod(api.POST, "/insight/metric", insight.createMetric)
api.HandleAPIMethod(api.PUT, "/insight/metric/:metric_id", insight.updateMetric)
api.HandleAPIMethod(api.DELETE, "/insight/metric/:metric_id", insight.deleteMetric)
}
137 changes: 79 additions & 58 deletions plugin/api/insight/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ func getMetricData(metric *insight.Metric) (interface{}, error) {
return nil, err
}
esClient := elastic.GetClient(metric.ClusterId)
//log.Error(string(util.MustToJSONBytes(query)))
searchRes, err := esClient.SearchWithRawQueryDSL(metric.IndexPattern, util.MustToJSONBytes(query))
queryDSL := util.MustToJSONBytes(query)
searchRes, err := esClient.SearchWithRawQueryDSL(metric.IndexPattern, queryDSL)
if err != nil {
return nil, err
}
Expand All @@ -266,83 +266,101 @@ func getMetricData(metric *insight.Metric) (interface{}, error) {
}
}
timeBeforeGroup := metric.AutoTimeBeforeGroup()
metricData := CollectMetricData(agg, timeBeforeGroup)
metricData, interval := CollectMetricData(agg, timeBeforeGroup)
formula := strings.TrimSpace(metric.Formula)
//support older versions for a single formula.
if metric.Formula != "" && len(metric.Formulas) == 0 {
metric.Formulas = []string{metric.Formula}
}

var targetMetricData []insight.MetricData
formula := strings.TrimSpace(metric.Formula)
if len(metric.Items) == 1 && formula == "" {
if len(metric.Items) == 1 && len(metric.Formulas) == 0 {
targetMetricData = metricData
} else {
tpl, err := template.New("insight_formula").Parse(formula)
if err != nil {
return nil, err
}
msgBuffer := &bytes.Buffer{}
params := map[string]interface{}{}
if metric.BucketSize != "" {
du, err := util.ParseDuration(metric.BucketSize)
if err != nil {
return nil, err
bucketSize := metric.BucketSize
if metric.BucketSize == "auto" && interval != "" {
bucketSize = interval
}
if interval != "" || bucketSize != "auto" {
du, err := util.ParseDuration(bucketSize)
if err != nil {
return nil, err
}
params["bucket_size_in_second"] = du.Seconds()
}
params["bucket_size_in_second"] = du.Seconds()
}
err = tpl.Execute(msgBuffer, params)
if err != nil {
return nil, err
}
formula = msgBuffer.String()
for _, md := range metricData {
targetData := insight.MetricData{
Groups: md.Groups,
Data: map[string][]insight.MetricDataItem{},
}
expression, err := govaluate.NewEvaluableExpression(formula)
if err != nil {
return nil, err
}
dataLength := 0
for _, v := range md.Data {
dataLength = len(v)
break
}
DataLoop:
for i := 0; i < dataLength; i++ {
parameters := map[string]interface{}{}
var timestamp interface{}
hasValidData := false
for k, v := range md.Data {
if len(k) == 20 {
continue
}
if len(v) < dataLength {
continue
}
if _, ok := v[i].Value.(float64); !ok {
continue DataLoop
}
hasValidData = true
parameters[k] = v[i].Value
timestamp = v[i].Timestamp
retMetricDataItem := insight.MetricDataItem{}
for _, formula = range metric.Formulas {
tpl, err := template.New("insight_formula").Parse(formula)
if err != nil {
return nil, err
}
//todo return error?
if !hasValidData {
continue
msgBuffer := &bytes.Buffer{}
err = tpl.Execute(msgBuffer, params)
if err != nil {
return nil, err
}
result, err := expression.Evaluate(parameters)
resolvedFormula := msgBuffer.String()
expression, err := govaluate.NewEvaluableExpression(resolvedFormula)
if err != nil {
return nil, err
}
if r, ok := result.(float64); ok {
if math.IsNaN(r) || math.IsInf(r, 0) {
//if !isFilterNaN {
// targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, math.NaN()})
//}
dataLength := 0
for _, v := range md.Data {
dataLength = len(v)
break
}
DataLoop:
for i := 0; i < dataLength; i++ {
parameters := map[string]interface{}{}
var timestamp interface{}
hasValidData := false
for k, v := range md.Data {
if _, ok := v[i].Value.(float64); !ok {
continue DataLoop
}
hasValidData = true
parameters[k] = v[i].Value
timestamp = v[i].Timestamp
}
//todo return error?
if !hasValidData {
continue
}
result, err := expression.Evaluate(parameters)
if err != nil {
log.Debugf("evaluate formula error: %v", err)
continue
}
if r, ok := result.(float64); ok {
if math.IsNaN(r) || math.IsInf(r, 0) {
//if !isFilterNaN {
// targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, math.NaN()})
//}
continue
}
}
retMetricDataItem.Timestamp = timestamp
if len(metric.Formulas) <= 1 && metric.Formula != ""{
//support older versions by returning the result for a single formula.
retMetricDataItem.Value = result
} else {
if v, ok := retMetricDataItem.Value.(map[string]interface{}); ok {
v[formula] = result
}else{
retMetricDataItem.Value = map[string]interface{}{formula: result}
}
}
}

targetData.Data["result"] = append(targetData.Data["result"], insight.MetricDataItem{Timestamp: timestamp, Value: result})
}
targetData.Data["result"] = append(targetData.Data["result"], retMetricDataItem)
targetMetricData = append(targetMetricData, targetData)
}
}
Expand All @@ -356,7 +374,10 @@ func getMetricData(metric *insight.Metric) (interface{}, error) {
}
}
}
return result, nil
return util.MapStr{
"data": result,
"request": string(queryDSL),
}, nil
}

func getMetadataByIndexPattern(clusterID, indexPattern, timeField string, filter interface{}, fieldsFormat map[string]string) (interface{}, error) {
Expand Down
Loading

0 comments on commit 692b87e

Please sign in to comment.