Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loki: add no_ready_check option #3317

Merged
merged 4 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 29 additions & 19 deletions pkg/acquisition/modules/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
WaitForReady time.Duration `yaml:"wait_for_ready"` // Retry interval, default is 10 seconds
Auth LokiAuthConfiguration `yaml:"auth"`
MaxFailureDuration time.Duration `yaml:"max_failure_duration"` // Max duration of failure before stopping the source
NoReadyCheck bool `yaml:"no_ready_check"` // Bypass /ready check before starting
configuration.DataSourceCommonCfg `yaml:",inline"`
}

Expand Down Expand Up @@ -229,6 +230,14 @@
l.logger.Logger.SetLevel(level)
}

if noReadyCheck := params.Get("no_ready_check"); noReadyCheck != "" {
noReadyCheck, err := strconv.ParseBool(noReadyCheck)
if err != nil {
return fmt.Errorf("invalid no_ready_check in dsn: %w", err)
}

Check warning on line 237 in pkg/acquisition/modules/loki/loki.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/loki/loki.go#L236-L237

Added lines #L236 - L237 were not covered by tests
l.Config.NoReadyCheck = noReadyCheck
}

l.Config.URL = fmt.Sprintf("%s://%s", scheme, u.Host)
if u.User != nil {
l.Config.Auth.Username = u.User.Username()
Expand Down Expand Up @@ -264,26 +273,28 @@
func (l *LokiSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
l.logger.Debug("Loki one shot acquisition")
l.Client.SetTomb(t)
readyCtx, cancel := context.WithTimeout(ctx, l.Config.WaitForReady)
defer cancel()
err := l.Client.Ready(readyCtx)
if err != nil {
return fmt.Errorf("loki is not ready: %w", err)

if !l.Config.NoReadyCheck {
readyCtx, readyCancel := context.WithTimeout(ctx, l.Config.WaitForReady)
defer readyCancel()
err := l.Client.Ready(readyCtx)
if err != nil {
return fmt.Errorf("loki is not ready: %w", err)
}

Check warning on line 283 in pkg/acquisition/modules/loki/loki.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/loki/loki.go#L282-L283

Added lines #L282 - L283 were not covered by tests
}

ctx, cancel = context.WithCancel(ctx)
c := l.Client.QueryRange(ctx, false)
lokiCtx, cancel := context.WithCancel(ctx)
defer cancel()
c := l.Client.QueryRange(lokiCtx, false)

for {
select {
case <-t.Dying():
l.logger.Debug("Loki one shot acquisition stopped")
cancel()
return nil
case resp, ok := <-c:
if !ok {
l.logger.Info("Loki acquisition done, chan closed")
cancel()
return nil
}
for _, stream := range resp.Data.Result {
Expand Down Expand Up @@ -314,27 +325,26 @@

func (l *LokiSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
l.Client.SetTomb(t)
readyCtx, cancel := context.WithTimeout(ctx, l.Config.WaitForReady)
defer cancel()
err := l.Client.Ready(readyCtx)
if err != nil {
return fmt.Errorf("loki is not ready: %w", err)

if !l.Config.NoReadyCheck {
readyCtx, readyCancel := context.WithTimeout(ctx, l.Config.WaitForReady)
defer readyCancel()
err := l.Client.Ready(readyCtx)
if err != nil {
return fmt.Errorf("loki is not ready: %w", err)
}
}
ll := l.logger.WithField("websocket_url", l.lokiWebsocket)
t.Go(func() error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
respChan := l.Client.QueryRange(ctx, true)
if err != nil {
ll.Errorf("could not start loki tail: %s", err)
return fmt.Errorf("while starting loki tail: %w", err)
}
for {
select {
case resp, ok := <-respChan:
if !ok {
ll.Warnf("loki channel closed")
return err
return errors.New("loki channel closed")

Check warning on line 347 in pkg/acquisition/modules/loki/loki.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/loki/loki.go#L347

Added line #L347 was not covered by tests
}
for _, stream := range resp.Data.Result {
for _, entry := range stream.Entries {
Expand Down
23 changes: 22 additions & 1 deletion pkg/acquisition/modules/loki/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestConfiguration(t *testing.T) {
password string
waitForReady time.Duration
delayFor time.Duration
noReadyCheck bool
testName string
}{
{
Expand Down Expand Up @@ -99,6 +100,19 @@ query: >
mode: tail
source: loki
url: http://localhost:3100/
no_ready_check: true
query: >
{server="demo"}
`,
expectedErr: "",
testName: "Correct config with no_ready_check",
noReadyCheck: true,
},
{
config: `
mode: tail
source: loki
url: http://localhost:3100/
auth:
username: foo
password: bar
Expand Down Expand Up @@ -148,6 +162,8 @@ query: >
t.Fatalf("Wrong DelayFor %v != %v", lokiSource.Config.DelayFor, test.delayFor)
}
}

assert.Equal(t, test.noReadyCheck, lokiSource.Config.NoReadyCheck)
})
}
}
Expand All @@ -164,6 +180,7 @@ func TestConfigureDSN(t *testing.T) {
scheme string
waitForReady time.Duration
delayFor time.Duration
noReadyCheck bool
}{
{
name: "Wrong scheme",
Expand Down Expand Up @@ -202,10 +219,11 @@ func TestConfigureDSN(t *testing.T) {
},
{
name: "Correct DSN",
dsn: `loki://localhost:3100/?query={server="demo"}&wait_for_ready=5s&delay_for=1s`,
dsn: `loki://localhost:3100/?query={server="demo"}&wait_for_ready=5s&delay_for=1s&no_ready_check=true`,
expectedErr: "",
waitForReady: 5 * time.Second,
delayFor: 1 * time.Second,
noReadyCheck: true,
},
{
name: "SSL DSN",
Expand Down Expand Up @@ -256,6 +274,9 @@ func TestConfigureDSN(t *testing.T) {
t.Fatalf("Wrong DelayFor %v != %v", lokiSource.Config.DelayFor, test.delayFor)
}
}

assert.Equal(t, test.noReadyCheck, lokiSource.Config.NoReadyCheck)

}
}

Expand Down