Skip to content

Commit

Permalink
issue-83: refactor backend logic, simplify a lot (tests works well)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryk-dk committed Oct 31, 2024
1 parent 380b13a commit 25b8aac
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 91 deletions.
159 changes: 74 additions & 85 deletions pkg/plugin/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
)

var (
_ backend.StreamHandler = &Datasource{}
_ backend.StreamHandler = &Datasource{}
_ backend.QueryDataHandler = &Datasource{}
_ backend.CheckHealthHandler = &Datasource{}
_ instancemgmt.InstanceDisposer = &Datasource{}
)

const (
Expand All @@ -40,6 +43,7 @@ func NewDatasource(ctx context.Context, settings backend.DataSourceInstanceSetti
return &Datasource{
settings: settings,
httpClient: cl,
streamCh: make(chan *data.Frame),
}, nil
}

Expand All @@ -49,6 +53,7 @@ type Datasource struct {
settings backend.DataSourceInstanceSettings

httpClient *http.Client
streamCh chan *data.Frame
}

func (d *Datasource) SubscribeStream(ctx context.Context, request *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
Expand All @@ -64,69 +69,11 @@ func (d *Datasource) PublishStream(ctx context.Context, request *backend.Publish
}

func (d *Datasource) RunStream(ctx context.Context, request *backend.RunStreamRequest, sender *backend.StreamSender) error {
q := Query{}
if err := json.Unmarshal(request.Data, &q); err != nil {
return fmt.Errorf("failed to parse query json: %w", err)
}

var settings struct {
HTTPMethod string `json:"httpMethod"`
QueryParams string `json:"customQueryParameters"`
}
if err := json.Unmarshal(d.settings.JSONData, &settings); err != nil {
return fmt.Errorf("failed to parse datasource settings: %w", err)
}
if settings.HTTPMethod == "" {
settings.HTTPMethod = http.MethodPost
}

q.TimeRange = TimeRange(q.TimeRange)
reqURL, err := q.queryTailURL(d.settings.URL, settings.QueryParams)
if err != nil {
return fmt.Errorf("failed to create request URL: %w", err)
}
backend.Logger.Info("Request URL: %s", reqURL)
req, err := http.NewRequestWithContext(ctx, settings.HTTPMethod, reqURL, nil)
if err != nil {
return fmt.Errorf("failed to create new request with context: %w", err)
}
resp, err := d.httpClient.Do(req)
if err != nil {
if !isTrivialError(err) {
// Return unexpected error to the caller.
return err
}

// Something in the middle between client and datasource might be closing
// the connection. So we do a one more attempt in hope request will succeed.
req, err = http.NewRequestWithContext(ctx, settings.HTTPMethod, reqURL, nil)
if err != nil {
return fmt.Errorf("failed to create new request with context: %w", err)
}
resp, err = d.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to make http request: %w", err)
}
}
defer func() {
if err := resp.Body.Close(); err != nil {
log.DefaultLogger.Error("failed to close response body", "err", err.Error())
}
}()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("got unexpected response status code: %d", resp.StatusCode)
}

rspCh := make(chan *data.Frame)

go func() {
prev := data.FrameJSONCache{}
for frame := range rspCh {
if err != nil {
backend.Logger.Error("Failed to unmarshal frame", "error", err)
continue
}
var err error
for frame := range d.streamCh {
next, _ := data.FrameToJSONCache(frame)
if next.SameSchema(&prev) {
err = sender.SendBytes(next.Bytes(data.IncludeDataOnly))
Expand All @@ -136,12 +83,18 @@ func (d *Datasource) RunStream(ctx context.Context, request *backend.RunStreamRe
prev = next

if err != nil {
// TODO I can't find any of this error in the code
// so just check the error message
if strings.Contains(err.Error(), "rpc error: code = Canceled desc = context canceled") {
backend.Logger.Info("Client has canceled the request")
break
}
backend.Logger.Error("Failed send frame", "error", err)
}
}
}()

if err := parseStreamResponse(resp.Body, rspCh); err != nil {
if err := d.streamQuery(ctx, request); err != nil {
return fmt.Errorf("failed to parse stream response: %w", err)
}

Expand All @@ -154,6 +107,7 @@ func (d *Datasource) RunStream(ctx context.Context, request *backend.RunStreamRe
func (d *Datasource) Dispose() {
// Clean up datasource instance resources.
d.httpClient.CloseIdleConnections()
close(d.streamCh)
}

// QueryData handles multiple queries and returns multiple responses.
Expand All @@ -176,69 +130,104 @@ func (d *Datasource) QueryData(ctx context.Context, req *backend.QueryDataReques
return response, nil
}

func (d *Datasource) query(ctx context.Context, _ backend.PluginContext, query backend.DataQuery) backend.DataResponse {
func (d *Datasource) streamQuery(ctx context.Context, request *backend.RunStreamRequest) error {
q, err := d.getQueryFromRaw(request.Data)
if err != nil {
return err
}

r, err := d.datasourceQuery(ctx, q, true)
if err != nil {
return err
}

return parseStreamResponse(r, d.streamCh)
}

// query sends a query to the datasource and returns the result.
func (d *Datasource) getQueryFromRaw(data json.RawMessage) (*Query, error) {
var q Query
if err := json.Unmarshal(query.JSON, &q); err != nil {
err = fmt.Errorf("failed to parse query json: %s", err)
return newResponseError(err, backend.StatusBadRequest)
if err := json.Unmarshal(data, &q); err != nil {
return nil, fmt.Errorf("failed to parse query json: %s", err)
}
return &q, nil
}

func (d *Datasource) datasourceQuery(ctx context.Context, q *Query, isStream bool) (io.ReadCloser, error) {

var settings struct {
HTTPMethod string `json:"httpMethod"`
QueryParams string `json:"customQueryParameters"`
}
if err := json.Unmarshal(d.settings.JSONData, &settings); err != nil {
err = fmt.Errorf("failed to parse datasource settings: %w", err)
return newResponseError(err, backend.StatusBadRequest)
return nil, fmt.Errorf("failed to parse datasource settings: %w", err)
}
if settings.HTTPMethod == "" {
settings.HTTPMethod = http.MethodPost
}

q.TimeRange = TimeRange(query.TimeRange)
reqURL, err := q.getQueryURL(d.settings.URL, settings.QueryParams)
if err != nil {
err = fmt.Errorf("failed to create request URL: %w", err)
return newResponseError(err, backend.StatusBadRequest)
return nil, fmt.Errorf("failed to create request URL: %w", err)
}

if isStream {
reqURL, err = q.queryTailURL(d.settings.URL, settings.QueryParams)
if err != nil {
return nil, fmt.Errorf("failed to create request URL: %w", err)
}
}

req, err := http.NewRequestWithContext(ctx, settings.HTTPMethod, reqURL, nil)
if err != nil {
err = fmt.Errorf("failed to create new request with context: %w", err)
return newResponseError(err, backend.StatusInternal)
return nil, fmt.Errorf("failed to create new request with context: %w", err)
}
resp, err := d.httpClient.Do(req)
if err != nil {
if !isTrivialError(err) {
// Return unexpected error to the caller.
return newResponseError(err, backend.StatusBadRequest)
return nil, err
}

// Something in the middle between client and datasource might be closing
// the connection. So we do a one more attempt in hope request will succeed.
req, err = http.NewRequestWithContext(ctx, settings.HTTPMethod, reqURL, nil)
if err != nil {
err = fmt.Errorf("failed to create new request with context: %w", err)
return newResponseError(err, backend.StatusBadRequest)
return nil, fmt.Errorf("failed to create new request with context: %w", err)
}
resp, err = d.httpClient.Do(req)
if err != nil {
err = fmt.Errorf("failed to make http request: %w", err)
return newResponseError(err, backend.StatusBadRequest)
return nil, fmt.Errorf("failed to make http request: %w", err)
}
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("got unexpected response status code: %d", resp.StatusCode)
}

return resp.Body, nil
}

func (d *Datasource) query(ctx context.Context, _ backend.PluginContext, query backend.DataQuery) backend.DataResponse {
q, err := d.getQueryFromRaw(query.JSON)
if err != nil {
return newResponseError(err, backend.StatusBadRequest)
}

q.TimeRange = TimeRange(query.TimeRange)

r, err := d.datasourceQuery(ctx, q, false)
if err != nil {
return newResponseError(err, backend.StatusInternal)
}

defer func() {
if err := resp.Body.Close(); err != nil {
if err := r.Close(); err != nil {
log.DefaultLogger.Error("failed to close response body", "err", err.Error())
}
}()

if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("got unexpected response status code: %d", resp.StatusCode)
return newResponseError(err, backend.Status(resp.StatusCode))
}

return parseInstantResponse(resp.Body)
return parseInstantResponse(r)
}

// CheckHealth handles health checks sent from Grafana to the plugin.
Expand Down
12 changes: 6 additions & 6 deletions pkg/plugin/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
gLabelsField = "labels"
gTimeField = "Time"
gLineField = "Line"

logsVisualisation = "logs"
)

// parseStreamResponse reads data from the reader and collects
Expand Down Expand Up @@ -148,6 +150,8 @@ func parseInstantResponse(reader io.Reader) backend.DataResponse {

// parseStreamResponse reads data from the reader and collects
// fields and frame with necessary information
// it looks like the parseInstantResponse function, but it reads data and continuously
// parse the lines from the reader and we need to collect only one data.Frame
func parseStreamResponse(reader io.Reader, ch chan *data.Frame) error {

br := bufio.NewReaderSize(reader, 64*1024)
Expand Down Expand Up @@ -254,16 +258,12 @@ func parseStreamResponse(reader io.Reader, ch chan *data.Frame) error {
}

frame := data.NewFrame("", timeFd, lineField, labelsField)
frame.Meta = &data.FrameMeta{PreferredVisualization: "logs"}
// this is necessary information because the logs visualization is preferred
frame.Meta = &data.FrameMeta{PreferredVisualization: logsVisualisation}

ch <- frame

}

// rsp := backend.DataResponse{}
// frame.Meta = &data.FrameMeta{}
// rsp.Frames = append(rsp.Frames, frame)

return nil
}

Expand Down

0 comments on commit 25b8aac

Please sign in to comment.