Skip to content

Commit

Permalink
check if the acquis tomb is dying while processing logs in replay mod…
Browse files Browse the repository at this point in the history
…e for file/s3/docker (#2152)
  • Loading branch information
blotus authored Apr 4, 2023
1 parent 3132aa5 commit 0279e54
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 45 deletions.
33 changes: 19 additions & 14 deletions pkg/acquisition/modules/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,21 +311,26 @@ func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) er
scanner = bufio.NewScanner(reader)
}
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
select {
case <-t.Dying():
d.logger.Infof("Shutting down reader for container %s", containerConfig.Name)
default:
line := scanner.Text()
if line == "" {
continue
}
l := types.Line{}
l.Raw = line
l.Labels = d.Config.Labels
l.Time = time.Now().UTC()
l.Src = containerConfig.Name
l.Process = true
l.Module = d.GetName()
linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc()
evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
out <- evt
d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
}
l := types.Line{}
l.Raw = line
l.Labels = d.Config.Labels
l.Time = time.Now().UTC()
l.Src = containerConfig.Name
l.Process = true
l.Module = d.GetName()
linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc()
evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
out <- evt
d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
}
err = scanner.Err()
if err != nil {
Expand Down
36 changes: 21 additions & 15 deletions pkg/acquisition/modules/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,22 +514,28 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom
scanner.Buffer(buf, f.config.MaxBufferSize)
}
for scanner.Scan() {
if scanner.Text() == "" {
continue
}
l := types.Line{
Raw: scanner.Text(),
Time: time.Now().UTC(),
Src: filename,
Labels: f.config.Labels,
Process: true,
Module: f.GetName(),
}
logger.Debugf("line %s", l.Raw)
linesRead.With(prometheus.Labels{"source": filename}).Inc()
select {
case <-t.Dying():
logger.Infof("File datasource %s stopping", filename)
return nil
default:
if scanner.Text() == "" {
continue
}
l := types.Line{
Raw: scanner.Text(),
Time: time.Now().UTC(),
Src: filename,
Labels: f.config.Labels,
Process: true,
Module: f.GetName(),
}
logger.Debugf("line %s", l.Raw)
linesRead.With(prometheus.Labels{"source": filename}).Inc()

//we're reading logs at once, it must be time-machine buckets
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
//we're reading logs at once, it must be time-machine buckets
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
}
}
if err := scanner.Err(); err != nil {
logger.Errorf("Error while reading file: %s", err)
Expand Down
39 changes: 23 additions & 16 deletions pkg/acquisition/modules/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,23 +418,29 @@ func (s *S3Source) readFile(bucket string, key string) error {
scanner.Buffer(buf, s.Config.MaxBufferSize)
}
for scanner.Scan() {
text := scanner.Text()
logger.Tracef("Read line %s", text)
linesRead.WithLabelValues(bucket).Inc()
l := types.Line{}
l.Raw = text
l.Labels = s.Config.Labels
l.Time = time.Now().UTC()
l.Process = true
l.Module = s.GetName()
l.Src = bucket
var evt types.Event
if !s.Config.UseTimeMachine {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
} else {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
select {
case <-s.t.Dying():
s.logger.Infof("Shutting down reader for %s/%s", bucket, key)
return nil
default:
text := scanner.Text()
logger.Tracef("Read line %s", text)
linesRead.WithLabelValues(bucket).Inc()
l := types.Line{}
l.Raw = text
l.Labels = s.Config.Labels
l.Time = time.Now().UTC()
l.Process = true
l.Module = s.GetName()
l.Src = bucket
var evt types.Event
if !s.Config.UseTimeMachine {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
} else {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
}
s.out <- evt
}
s.out <- evt
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("failed to read object %s/%s: %s", bucket, key, err)
Expand Down Expand Up @@ -629,6 +635,7 @@ func (s *S3Source) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error
s.out = out
s.ctx, s.cancel = context.WithCancel(context.Background())
s.Config.UseTimeMachine = true
s.t = t
if s.Config.Key != "" {
err := s.readFile(s.Config.BucketName, s.Config.Key)
if err != nil {
Expand Down

0 comments on commit 0279e54

Please sign in to comment.