diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index d50787b652b..c57e6a67c94 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -53,6 +53,7 @@ type LokiConfiguration struct { WaitForReady time.Duration `yaml:"wait_for_ready"` // Retry interval, default is 10 seconds Auth LokiAuthConfiguration `yaml:"auth"` MaxFailureDuration time.Duration `yaml:"max_failure_duration"` // Max duration of failure before stopping the source + NoReadyCheck bool `yaml:"no_ready_check"` // Bypass /ready check before starting configuration.DataSourceCommonCfg `yaml:",inline"` } @@ -229,6 +230,14 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger l.logger.Logger.SetLevel(level) } + if noReadyCheck := params.Get("no_ready_check"); noReadyCheck != "" { + noReadyCheck, err := strconv.ParseBool(noReadyCheck) + if err != nil { + return fmt.Errorf("invalid no_ready_check in dsn: %w", err) + } + l.Config.NoReadyCheck = noReadyCheck + } + l.Config.URL = fmt.Sprintf("%s://%s", scheme, u.Host) if u.User != nil { l.Config.Auth.Username = u.User.Username() @@ -264,26 +273,28 @@ func (l *LokiSource) GetName() string { func (l *LokiSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error { l.logger.Debug("Loki one shot acquisition") l.Client.SetTomb(t) - readyCtx, cancel := context.WithTimeout(ctx, l.Config.WaitForReady) - defer cancel() - err := l.Client.Ready(readyCtx) - if err != nil { - return fmt.Errorf("loki is not ready: %w", err) + + if !l.Config.NoReadyCheck { + readyCtx, readyCancel := context.WithTimeout(ctx, l.Config.WaitForReady) + defer readyCancel() + err := l.Client.Ready(readyCtx) + if err != nil { + return fmt.Errorf("loki is not ready: %w", err) + } } - ctx, cancel = context.WithCancel(ctx) - c := l.Client.QueryRange(ctx, false) + lokiCtx, cancel := context.WithCancel(ctx) + defer cancel() + c := l.Client.QueryRange(lokiCtx, false) for { select { case <-t.Dying(): l.logger.Debug("Loki one shot acquisition stopped") - cancel() return nil case resp, ok := <-c: if !ok { l.logger.Info("Loki acquisition done, chan closed") - cancel() return nil } for _, stream := range resp.Data.Result { @@ -314,27 +325,26 @@ func (l *LokiSource) readOneEntry(entry lokiclient.Entry, labels map[string]stri func (l *LokiSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error { l.Client.SetTomb(t) - readyCtx, cancel := context.WithTimeout(ctx, l.Config.WaitForReady) - defer cancel() - err := l.Client.Ready(readyCtx) - if err != nil { - return fmt.Errorf("loki is not ready: %w", err) + + if !l.Config.NoReadyCheck { + readyCtx, readyCancel := context.WithTimeout(ctx, l.Config.WaitForReady) + defer readyCancel() + err := l.Client.Ready(readyCtx) + if err != nil { + return fmt.Errorf("loki is not ready: %w", err) + } } ll := l.logger.WithField("websocket_url", l.lokiWebsocket) t.Go(func() error { ctx, cancel := context.WithCancel(ctx) defer cancel() respChan := l.Client.QueryRange(ctx, true) - if err != nil { - ll.Errorf("could not start loki tail: %s", err) - return fmt.Errorf("while starting loki tail: %w", err) - } for { select { case resp, ok := <-respChan: if !ok { ll.Warnf("loki channel closed") - return err + return errors.New("loki channel closed") } for _, stream := range resp.Data.Result { for _, entry := range stream.Entries { diff --git a/pkg/acquisition/modules/loki/loki_test.go b/pkg/acquisition/modules/loki/loki_test.go index cacdda32d80..643aefad715 100644 --- a/pkg/acquisition/modules/loki/loki_test.go +++ b/pkg/acquisition/modules/loki/loki_test.go @@ -34,6 +34,7 @@ func TestConfiguration(t *testing.T) { password string waitForReady time.Duration delayFor time.Duration + noReadyCheck bool testName string }{ { @@ -99,6 +100,19 @@ query: > mode: tail source: loki url: http://localhost:3100/ +no_ready_check: true +query: > + {server="demo"} +`, + expectedErr: "", + testName: "Correct config with no_ready_check", + noReadyCheck: true, + }, + { + config: ` +mode: tail +source: loki +url: http://localhost:3100/ auth: username: foo password: bar @@ -148,6 +162,8 @@ query: > t.Fatalf("Wrong DelayFor %v != %v", lokiSource.Config.DelayFor, test.delayFor) } } + + assert.Equal(t, test.noReadyCheck, lokiSource.Config.NoReadyCheck) }) } } @@ -164,6 +180,7 @@ func TestConfigureDSN(t *testing.T) { scheme string waitForReady time.Duration delayFor time.Duration + noReadyCheck bool }{ { name: "Wrong scheme", @@ -202,10 +219,11 @@ func TestConfigureDSN(t *testing.T) { }, { name: "Correct DSN", - dsn: `loki://localhost:3100/?query={server="demo"}&wait_for_ready=5s&delay_for=1s`, + dsn: `loki://localhost:3100/?query={server="demo"}&wait_for_ready=5s&delay_for=1s&no_ready_check=true`, expectedErr: "", waitForReady: 5 * time.Second, delayFor: 1 * time.Second, + noReadyCheck: true, }, { name: "SSL DSN", @@ -256,6 +274,9 @@ func TestConfigureDSN(t *testing.T) { t.Fatalf("Wrong DelayFor %v != %v", lokiSource.Config.DelayFor, test.delayFor) } } + + assert.Equal(t, test.noReadyCheck, lokiSource.Config.NoReadyCheck) + } }