Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loki: add no_ready_check option #3317

Merged
merged 4 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 30 additions & 14 deletions pkg/acquisition/modules/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type LokiConfiguration struct {
WaitForReady time.Duration `yaml:"wait_for_ready"` // Retry interval, default is 10 seconds
Auth LokiAuthConfiguration `yaml:"auth"`
MaxFailureDuration time.Duration `yaml:"max_failure_duration"` // Max duration of failure before stopping the source
NoReadyCheck bool `yaml:"no_ready_check"` // Bypass /ready check before starting
configuration.DataSourceCommonCfg `yaml:",inline"`
}

Expand Down Expand Up @@ -229,6 +230,14 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger
l.logger.Logger.SetLevel(level)
}

if noReadyCheck := params.Get("no_ready_check"); noReadyCheck != "" {
noReadyCheck, err := strconv.ParseBool(noReadyCheck)
if err != nil {
return fmt.Errorf("invalid no_ready_check in dsn: %w", err)
}
l.Config.NoReadyCheck = noReadyCheck
}

l.Config.URL = fmt.Sprintf("%s://%s", scheme, u.Host)
if u.User != nil {
l.Config.Auth.Username = u.User.Username()
Expand Down Expand Up @@ -264,26 +273,28 @@ func (l *LokiSource) GetName() string {
func (l *LokiSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
l.logger.Debug("Loki one shot acquisition")
l.Client.SetTomb(t)
readyCtx, cancel := context.WithTimeout(ctx, l.Config.WaitForReady)
defer cancel()
err := l.Client.Ready(readyCtx)
if err != nil {
return fmt.Errorf("loki is not ready: %w", err)

if !l.Config.NoReadyCheck {
readyCtx, readyCancel := context.WithTimeout(ctx, l.Config.WaitForReady)
defer readyCancel()
err := l.Client.Ready(readyCtx)
if err != nil {
return fmt.Errorf("loki is not ready: %w", err)
}
}

ctx, cancel = context.WithCancel(ctx)
c := l.Client.QueryRange(ctx, false)
lokiCtx, cancel := context.WithCancel(ctx)
defer cancel()
c := l.Client.QueryRange(lokiCtx, false)

for {
select {
case <-t.Dying():
l.logger.Debug("Loki one shot acquisition stopped")
cancel()
return nil
case resp, ok := <-c:
if !ok {
l.logger.Info("Loki acquisition done, chan closed")
cancel()
return nil
}
for _, stream := range resp.Data.Result {
Expand Down Expand Up @@ -313,12 +324,17 @@ func (l *LokiSource) readOneEntry(entry lokiclient.Entry, labels map[string]stri
}

func (l *LokiSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
var err error
LaurenceJJones marked this conversation as resolved.
Show resolved Hide resolved

l.Client.SetTomb(t)
readyCtx, cancel := context.WithTimeout(ctx, l.Config.WaitForReady)
defer cancel()
err := l.Client.Ready(readyCtx)
if err != nil {
return fmt.Errorf("loki is not ready: %w", err)

if !l.Config.NoReadyCheck {
readyCtx, readyCancel := context.WithTimeout(ctx, l.Config.WaitForReady)
defer readyCancel()
err := l.Client.Ready(readyCtx)
if err != nil {
return fmt.Errorf("loki is not ready: %w", err)
}
}
ll := l.logger.WithField("websocket_url", l.lokiWebsocket)
t.Go(func() error {
Expand Down
23 changes: 22 additions & 1 deletion pkg/acquisition/modules/loki/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestConfiguration(t *testing.T) {
password string
waitForReady time.Duration
delayFor time.Duration
noReadyCheck bool
testName string
}{
{
Expand Down Expand Up @@ -99,6 +100,19 @@ query: >
mode: tail
source: loki
url: http://localhost:3100/
no_ready_check: true
query: >
{server="demo"}
`,
expectedErr: "",
testName: "Correct config with no_ready_check",
noReadyCheck: true,
},
{
config: `
mode: tail
source: loki
url: http://localhost:3100/
auth:
username: foo
password: bar
Expand Down Expand Up @@ -148,6 +162,8 @@ query: >
t.Fatalf("Wrong DelayFor %v != %v", lokiSource.Config.DelayFor, test.delayFor)
}
}

assert.Equal(t, test.noReadyCheck, lokiSource.Config.NoReadyCheck)
})
}
}
Expand All @@ -164,6 +180,7 @@ func TestConfigureDSN(t *testing.T) {
scheme string
waitForReady time.Duration
delayFor time.Duration
noReadyCheck bool
}{
{
name: "Wrong scheme",
Expand Down Expand Up @@ -202,10 +219,11 @@ func TestConfigureDSN(t *testing.T) {
},
{
name: "Correct DSN",
dsn: `loki://localhost:3100/?query={server="demo"}&wait_for_ready=5s&delay_for=1s`,
dsn: `loki://localhost:3100/?query={server="demo"}&wait_for_ready=5s&delay_for=1s&no_ready_check=true`,
expectedErr: "",
waitForReady: 5 * time.Second,
delayFor: 1 * time.Second,
noReadyCheck: true,
},
{
name: "SSL DSN",
Expand Down Expand Up @@ -256,6 +274,9 @@ func TestConfigureDSN(t *testing.T) {
t.Fatalf("Wrong DelayFor %v != %v", lokiSource.Config.DelayFor, test.delayFor)
}
}

assert.Equal(t, test.noReadyCheck, lokiSource.Config.NoReadyCheck)

}
}

Expand Down
Loading