Skip to content

Commit

Permalink
Merge pull request #8 from extremenetworks/telegraf_dev
Browse files Browse the repository at this point in the history
TELEGRAF-1: Telegraf blocked when using more than one pipe
  • Loading branch information
chayan-04 authored Nov 15, 2023
2 parents 5d58c03 + bed10ba commit 97f97bd
Showing 1 changed file with 44 additions and 47 deletions.
91 changes: 44 additions & 47 deletions plugins/inputs/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,72 +217,69 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
if err != nil {
t.Log.Errorf("Glob %q failed to compile: %s", filepath, err.Error())
}

for _, file := range g.Match() {
if _, ok := t.tailers[file]; ok {
// we're already tailing this file
continue
}
// create a goroutine for each "tailer"
t.wg.Add(1)

var seek *tail.SeekInfo
if !t.Pipe && !fromBeginning {
if offset, ok := t.offsets[file]; ok {
t.Log.Debugf("Using offset %d for %q", offset, file)
seek = &tail.SeekInfo{
Whence: 0,
Offset: offset,
}
} else {
seek = &tail.SeekInfo{
Whence: 2,
Offset: 0,
go func() {
defer t.wg.Done()
var seek *tail.SeekInfo
if !t.Pipe && !fromBeginning {
if offset, ok := t.offsets[file]; ok {
t.Log.Debugf("Using offset %d for %q", offset, file)
seek = &tail.SeekInfo{
Whence: 0,
Offset: offset,
}
} else {
seek = &tail.SeekInfo{
Whence: 2,
Offset: 0,
}
}
}
}

tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Follow: true,
Location: seek,
MustExist: true,
Poll: poll,
Pipe: t.Pipe,
Logger: tail.DiscardingLogger,
OpenReaderFunc: func(rd io.Reader) io.Reader {
r, _ := utfbom.Skip(t.decoder.Reader(rd))
return r
},
})

if err != nil {
t.Log.Debugf("Failed to open file (%s): %v", file, err)
continue
}

t.Log.Debugf("Tail added for %q", file)

parser, err := t.parserFunc()
if err != nil {
t.Log.Errorf("Creating parser: %s", err.Error())
continue
}
tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Follow: true,
Location: seek,
MustExist: true,
Poll: poll,
Pipe: t.Pipe,
Logger: tail.DiscardingLogger,
OpenReaderFunc: func(rd io.Reader) io.Reader {
r, _ := utfbom.Skip(t.decoder.Reader(rd))
return r
},
})

if err != nil {
t.Log.Debugf("Failed to open file (%s): %v", file, err)
return
}
t.tailers[tailer.Filename] = tailer

// create a goroutine for each "tailer"
t.wg.Add(1)
parser, err := t.parserFunc()
if err != nil {
t.Log.Errorf("Creating parser: %s", err.Error())
return
}

go func() {
defer t.wg.Done()
t.receiver(parser, tailer)

t.Log.Debugf("Tail removed for %q", tailer.Filename)

if err := tailer.Err(); err != nil {
t.Log.Errorf("Tailing %q: %s", tailer.Filename, err.Error())
}
}()

t.tailers[tailer.Filename] = tailer
}

}
return nil
}
Expand Down

0 comments on commit 97f97bd

Please sign in to comment.