diff --git a/backend/datasource.go b/backend/datasource.go index 593ef93..6bc1057 100644 --- a/backend/datasource.go +++ b/backend/datasource.go @@ -4,11 +4,13 @@ import ( // "crypto/tls" "encoding/json" "fmt" + // "io/ioutil" // "net" // "strings" - "time" "errors" + "time" + "github.com/gocql/gocql" simplejson "github.com/bitly/go-simplejson" @@ -20,7 +22,7 @@ import ( type CassandraDatasource struct { plugin.NetRPCUnsupportedPlugin - logger hclog.Logger + logger hclog.Logger session *gocql.Session } @@ -47,7 +49,7 @@ func (ds *CassandraDatasource) Query(ctx context.Context, tsdbReq *datasource.Da return nil, err } - _, err = ds.Connect( + _, err = ds.Connect( tsdbReq.Datasource.Url, options["keyspace"], options["user"], @@ -69,7 +71,7 @@ func (ds *CassandraDatasource) Query(ctx context.Context, tsdbReq *datasource.Da case "query": return ds.MetricQuery(tsdbReq, queries, options) case "connection": - return &datasource.DatasourceResponse{}, nil; + return &datasource.DatasourceResponse{}, nil default: return nil, errors.New(fmt.Sprintf("Unknown query type '%s'", queryType)) } @@ -87,59 +89,112 @@ func (ds *CassandraDatasource) MetricQuery(tsdbReq *datasource.DatasourceRequest Series: make([]*datasource.TimeSeries, 0), } - serie := &datasource.TimeSeries{Name: queryData.Get("valueId").MustString()} + ds.logger.Debug(fmt.Sprintf("rawQuery: %v\n", queryData.Get("rawQuery").MustBool())) + ds.logger.Debug(fmt.Sprintf("target: %s\n", queryData.Get("target").MustString())) - var created_at time.Time - var value float64 + var preparedQuery string - allowFiltering := "" - if queryData.Get("filtering").MustBool() { - ds.logger.Warn("Allow filtering enabled") - allowFiltering = " ALLOW FILTERING" - } + if queryData.Get("rawQuery").MustBool() { - preparedQuery := fmt.Sprintf( - "SELECT %s, CAST(%s as double) FROM %s.%s WHERE %s = %s%s", - queryData.Get("columnTime").MustString(), - queryData.Get("columnValue").MustString(), - queryData.Get("keyspace").MustString(), - queryData.Get("table").MustString(), - queryData.Get("columnId").MustString(), - queryData.Get("valueId").MustString(), - allowFiltering, - ) + preparedQuery = queryData.Get("target").MustString() + + } else { + + allowFiltering := "" + if queryData.Get("filtering").MustBool() { + ds.logger.Warn("Allow filtering enabled") + allowFiltering = " ALLOW FILTERING" + } + preparedQuery = fmt.Sprintf( + "SELECT %s, CAST(%s as double) FROM %s.%s WHERE %s = %s%s", + queryData.Get("columnTime").MustString(), + queryData.Get("columnValue").MustString(), + queryData.Get("keyspace").MustString(), + queryData.Get("table").MustString(), + queryData.Get("columnId").MustString(), + queryData.Get("valueId").MustString(), + allowFiltering, + ) + + } ds.logger.Debug(fmt.Sprintf("Executing CQL query: '%s' ...\n", preparedQuery)) iter := ds.session.Query(preparedQuery).Iter() - for iter.Scan(&created_at, &value) { - serie.Points = append(serie.Points, &datasource.Point{ - Timestamp: created_at.UnixNano() / int64(time.Millisecond), - Value: value, - }) - } - if err := iter.Close(); err != nil { - ds.logger.Error(fmt.Sprintf("Error while processing a query: %s\n", err.Error())) - return &datasource.DatasourceResponse{ - Results: []*datasource.QueryResult{ - &datasource.QueryResult{ - Error: err.Error(), + + var id string + var timestamp time.Time + var value float64 + + if queryData.Get("rawQuery").MustBool() { + + series := make(map[string]*datasource.TimeSeries) + + for iter.Scan(&id, &value, ×tamp) { + + if _, ok := series[id]; !ok { + series[id] = &datasource.TimeSeries{Name: id} + } + + series[id].Points = append(series[id].Points, &datasource.Point{ + Timestamp: timestamp.UnixNano() / int64(time.Millisecond), + Value: value, + }) + + } + + if err := iter.Close(); err != nil { + ds.logger.Error(fmt.Sprintf("Error while processing a query: %s\n", err.Error())) + return &datasource.DatasourceResponse{ + Results: []*datasource.QueryResult{ + &datasource.QueryResult{ + Error: err.Error(), + }, }, - }, - }, nil - } + }, nil + } + + for _, serie2 := range series { + queryResult.Series = append(queryResult.Series, serie2) + } + + response.Results = append(response.Results, &queryResult) + + } else { + + serie := &datasource.TimeSeries{Name: queryData.Get("valueId").MustString()} + + for iter.Scan(×tamp, &value) { + serie.Points = append(serie.Points, &datasource.Point{ + Timestamp: timestamp.UnixNano() / int64(time.Millisecond), + Value: value, + }) + } + if err := iter.Close(); err != nil { + ds.logger.Error(fmt.Sprintf("Error while processing a query: %s\n", err.Error())) + return &datasource.DatasourceResponse{ + Results: []*datasource.QueryResult{ + &datasource.QueryResult{ + Error: err.Error(), + }, + }, + }, nil + } - queryResult.Series = append(queryResult.Series, serie) + queryResult.Series = append(queryResult.Series, serie) + + response.Results = append(response.Results, &queryResult) + + } - response.Results = append(response.Results, &queryResult) } return response, nil } -func (ds *CassandraDatasource) SearchQuery(tsdbReq *datasource.DatasourceRequest, jsonQueries []*simplejson.Json) (*datasource.DatasourceResponse, error) { +func (ds *CassandraDatasource) SearchQuery(tsdbReq *datasource.DatasourceRequest, jsonQueries []*simplejson.Json) (*datasource.DatasourceResponse, error) { keyspaceName, keyspaceOk := jsonQueries[0].CheckGet("keyspace") - tableName, tableOk := jsonQueries[0].CheckGet("table") + tableName, tableOk := jsonQueries[0].CheckGet("table") if !keyspaceOk || !tableOk { ds.logger.Warn("Unable to search as keyspace or table is not set") @@ -161,7 +216,7 @@ func (ds *CassandraDatasource) SearchQuery(tsdbReq *datasource.DatasourceRequest columns := make([]*ColumnInfo, 0) for name, column := range tableMetadata.Columns { columns = append( - columns, + columns, &ColumnInfo{ Name: name, Type: column.Type.Type().String(), @@ -178,7 +233,7 @@ func (ds *CassandraDatasource) SearchQuery(tsdbReq *datasource.DatasourceRequest return &datasource.DatasourceResponse{ Results: []*datasource.QueryResult{ &datasource.QueryResult{ - RefId: "search", + RefId: "search", Tables: []*datasource.Table{ &datasource.Table{ Rows: []*datasource.TableRow{ @@ -242,7 +297,7 @@ func (ds *CassandraDatasource) GetRequestOptions(tsdbReq *datasource.DatasourceR } func (ds *CassandraDatasource) Connect(host string, keyspace string, username string, password string) (bool, error) { - if (ds.session != nil) { + if ds.session != nil { return true, nil } @@ -258,10 +313,10 @@ func (ds *CassandraDatasource) Connect(host string, keyspace string, username st session, err := cluster.CreateSession() if err != nil { ds.logger.Error(fmt.Sprintf("Unable to establish connection with the database, %s\n", err.Error())) - return false, err; + return false, err } ds.session = session ds.logger.Debug(fmt.Sprintf("Connection successful.\n")) - return true, nil; + return true, nil } diff --git a/src/partials/query.editor.html b/src/partials/query.editor.html index a3f1325..dca0ee6 100644 --- a/src/partials/query.editor.html +++ b/src/partials/query.editor.html @@ -1,4 +1,4 @@ - +
- -