From fa56d35a483f89b0fb1bd74a9c3ee4ae4ca31623 Mon Sep 17 00:00:00 2001 From: Laurence Jones Date: Fri, 9 Feb 2024 13:37:49 +0000 Subject: [PATCH] [Loki] Set headers/basic auth if set for queryRange (#2815) --- .../loki/internal/lokiclient/loki_client.go | 41 ++++++++++++------- pkg/acquisition/modules/loki/loki_test.go | 33 ++++++++++----- 2 files changed, 48 insertions(+), 26 deletions(-) diff --git a/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go b/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go index 8451a86fcdf..d2af4e8af28 100644 --- a/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go +++ b/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go @@ -25,6 +25,7 @@ type LokiClient struct { t *tomb.Tomb fail_start time.Time currentTickerInterval time.Duration + requestHeaders map[string]string } type Config struct { @@ -116,7 +117,7 @@ func (lc *LokiClient) queryRange(uri string, ctx context.Context, c chan *LokiQu case <-lc.t.Dying(): return lc.t.Err() case <-ticker.C: - resp, err := http.Get(uri) + resp, err := lc.Get(uri) if err != nil { if ok := lc.shouldRetry(); !ok { return errors.Wrapf(err, "error querying range") @@ -127,6 +128,7 @@ func (lc *LokiClient) queryRange(uri string, ctx context.Context, c chan *LokiQu } if resp.StatusCode != http.StatusOK { + lc.Logger.Warnf("bad HTTP response code for query range: %d", resp.StatusCode) body, _ := io.ReadAll(resp.Body) resp.Body.Close() if ok := lc.shouldRetry(); !ok { @@ -215,7 +217,7 @@ func (lc *LokiClient) Ready(ctx context.Context) error { return lc.t.Err() case <-tick.C: lc.Logger.Debug("Checking if Loki is ready") - resp, err := http.Get(url) + resp, err := lc.Get(url) if err != nil { lc.Logger.Warnf("Error checking if Loki is ready: %s", err) continue @@ -251,10 +253,9 @@ func (lc *LokiClient) Tail(ctx context.Context) (chan *LokiResponse, error) { } requestHeader := http.Header{} - for k, v := range lc.config.Headers { + for k, v := range lc.requestHeaders { requestHeader.Add(k, v) } - requestHeader.Set("User-Agent", "Crowdsec "+cwversion.VersionStr()) lc.Logger.Infof("Connecting to %s", u) conn, _, err := dialer.Dial(u, requestHeader) @@ -293,16 +294,6 @@ func (lc *LokiClient) QueryRange(ctx context.Context, infinite bool) chan *LokiQ lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, time.Now().Add(-lc.config.Since)) - requestHeader := http.Header{} - for k, v := range lc.config.Headers { - requestHeader.Add(k, v) - } - - if lc.config.Username != "" || lc.config.Password != "" { - requestHeader.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(lc.config.Username+":"+lc.config.Password))) - } - - requestHeader.Set("User-Agent", "Crowdsec "+cwversion.VersionStr()) lc.Logger.Infof("Connecting to %s", url) lc.t.Go(func() error { return lc.queryRange(url, ctx, c, infinite) @@ -310,6 +301,26 @@ func (lc *LokiClient) QueryRange(ctx context.Context, infinite bool) chan *LokiQ return c } +// Create a wrapper for http.Get to be able to set headers and auth +func (lc *LokiClient) Get(url string) (*http.Response, error) { + request, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, err + } + for k, v := range lc.requestHeaders { + request.Header.Add(k, v) + } + return http.DefaultClient.Do(request) +} + func NewLokiClient(config Config) *LokiClient { - return &LokiClient{Logger: log.WithField("component", "lokiclient"), config: config} + headers := make(map[string]string) + for k, v := range config.Headers { + headers[k] = v + } + if config.Username != "" || config.Password != "" { + headers["Authorization"] = "Basic " + base64.StdEncoding.EncodeToString([]byte(config.Username+":"+config.Password)) + } + headers["User-Agent"] = "Crowdsec " + cwversion.VersionStr() + return &LokiClient{Logger: log.WithField("component", "lokiclient"), config: config, requestHeaders: headers} } diff --git a/pkg/acquisition/modules/loki/loki_test.go b/pkg/acquisition/modules/loki/loki_test.go index fae2e3aa98f..6cac1c0fec3 100644 --- a/pkg/acquisition/modules/loki/loki_test.go +++ b/pkg/acquisition/modules/loki/loki_test.go @@ -276,10 +276,17 @@ func feedLoki(logger *log.Entry, n int, title string) error { if err != nil { return err } - resp, err := http.Post("http://127.0.0.1:3100/loki/api/v1/push", "application/json", bytes.NewBuffer(buff)) + req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:3100/loki/api/v1/push", bytes.NewBuffer(buff)) if err != nil { return err } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Scope-OrgID", "1234") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() if resp.StatusCode != http.StatusNoContent { b, _ := io.ReadAll(resp.Body) logger.Error(string(b)) @@ -306,6 +313,8 @@ mode: cat source: loki url: http://127.0.0.1:3100 query: '{server="demo",key="%s"}' +headers: + x-scope-orgid: "1234" since: 1h `, title), }, @@ -362,26 +371,26 @@ func TestStreamingAcquisition(t *testing.T) { }{ { name: "Bad port", - config: ` -mode: tail + config: `mode: tail source: loki -url: http://127.0.0.1:3101 +url: "http://127.0.0.1:3101" +headers: + x-scope-orgid: "1234" query: > - {server="demo"} -`, // No Loki server here + {server="demo"}`, // No Loki server here expectedErr: "", streamErr: `loki is not ready: context deadline exceeded`, expectedLines: 0, }, { name: "ok", - config: ` -mode: tail + config: `mode: tail source: loki -url: http://127.0.0.1:3100 +url: "http://127.0.0.1:3100" +headers: + x-scope-orgid: "1234" query: > - {server="demo"} -`, + {server="demo"}`, expectedErr: "", streamErr: "", expectedLines: 20, @@ -456,6 +465,8 @@ func TestStopStreaming(t *testing.T) { mode: tail source: loki url: http://127.0.0.1:3100 +headers: + x-scope-orgid: "1234" query: > {server="demo"} `