Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

file acquis: add mutex to protect access to the internal tail map #2878

Merged
merged 2 commits into from
Mar 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/acquisition/modules/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/fsnotify/fsnotify"
Expand Down Expand Up @@ -52,6 +53,7 @@ type FileSource struct {
logger *log.Entry
files []string
exclude_regexps []*regexp.Regexp
tailMapMutex *sync.RWMutex
}

func (f *FileSource) GetUuid() string {
Expand Down Expand Up @@ -105,6 +107,7 @@ func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry) error {
}

f.watchedDirectories = make(map[string]bool)
f.tailMapMutex = &sync.RWMutex{}
f.tails = make(map[string]bool)

f.watcher, err = fsnotify.NewWatcher()
Expand Down Expand Up @@ -350,7 +353,9 @@ func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
f.logger.Errorf("Could not start tailing file %s : %s", file, err)
continue
}
f.tailMapMutex.Lock()
f.tails[file] = true
f.tailMapMutex.Unlock()
t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/file/live/fsnotify")
return f.tailFile(out, t, tail)
Expand Down Expand Up @@ -412,11 +417,14 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
continue
}

f.tailMapMutex.RLock()
if f.tails[event.Name] {
f.tailMapMutex.RUnlock()
//we already have a tail on it, do not start a new one
logger.Debugf("Already tailing file %s, not creating a new tail", event.Name)
break
}
f.tailMapMutex.RUnlock()
//cf. https://github.com/crowdsecurity/crowdsec/issues/1168
//do not rely on stat, reclose file immediately as it's opened by Tail
fd, err := os.Open(event.Name)
Expand Down Expand Up @@ -453,7 +461,9 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
logger.Errorf("Could not start tailing file %s : %s", event.Name, err)
break
}
f.tailMapMutex.Lock()
f.tails[event.Name] = true
f.tailMapMutex.Unlock()
t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/tailfile")
return f.tailFile(out, t, tail)
Expand Down
Loading