Skip to content

Commit

Permalink
TELEGRAF-2: Data is not sent to output from second named pipe (#9)
Browse files Browse the repository at this point in the history
Multiple threads were being created since new files function
was getting called multiple time. A list is created to
check if the called named pipe is already being tailed.
  • Loading branch information
chayan-04 authored Dec 8, 2023
1 parent 97f97bd commit d05a0a5
Showing 1 changed file with 29 additions and 0 deletions.
29 changes: 29 additions & 0 deletions plugins/inputs/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"sync"
"time"
"container/list"

"github.com/dimchansky/utfbom"
"github.com/influxdata/tail"
Expand All @@ -29,6 +30,7 @@ const (
var (
offsets = make(map[string]int64)
offsetsMutex = new(sync.Mutex)
l = list.New();
)

type empty struct{}
Expand Down Expand Up @@ -207,6 +209,7 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {

func (t *Tail) tailNewFiles(fromBeginning bool) error {
var poll bool
var flag bool
if t.WatchMethod == "poll" {
poll = true
}
Expand All @@ -223,6 +226,21 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
// we're already tailing this file
continue
}

//Check whether tailing has started
flag = true
for e := l.Front(); e != nil; e = e.Next() {
if( e.Value == filepath) {
flag = false
}
}

if (flag == false) {
// we have already started tailing this file
continue
}
l.PushBack(filepath)

// create a goroutine for each "tailer"
t.wg.Add(1)

Expand Down Expand Up @@ -265,6 +283,8 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
}
t.tailers[tailer.Filename] = tailer

t.Log.Debugf("Tail added for %q", file)

parser, err := t.parserFunc()
if err != nil {
t.Log.Errorf("Creating parser: %s", err.Error())
Expand All @@ -274,6 +294,15 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
t.receiver(parser, tailer)
t.Log.Debugf("Tail removed for %q", tailer.Filename)

// Cleanup of the list
t.tailers[tailer.Filename] = nil;
for e := l.Front(); e != nil; e = e.Next() {
if( e.Value == filepath) {
l.Remove(e)
}
}


if err := tailer.Err(); err != nil {
t.Log.Errorf("Tailing %q: %s", tailer.Filename, err.Error())
}
Expand Down

0 comments on commit d05a0a5

Please sign in to comment.