Skip to content

Commit

Permalink
Merge pull request #17 from liuwenping/master
Browse files Browse the repository at this point in the history
minor: release FileHarvester in time when the configuration is deleted
  • Loading branch information
liuwenping authored Feb 21, 2022
2 parents 673310f + 0639ce6 commit 47eede5
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
3 changes: 3 additions & 0 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ func (h *Harvester) Run() error {
}

h.stop()

// Close reader
h.reader.Stop()
}(h.state.Source)

logp.Info("Harvester started for file: %s, offset: %d", h.state.Source, h.state.Offset)
Expand Down
14 changes: 13 additions & 1 deletion filebeat/input/log/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type ReuseHarvester struct {
Config config
State file.State
done chan struct{}
closeOnce sync.Once
fileReader *FileHarvester
message chan ReuseMessage
}
Expand Down Expand Up @@ -129,7 +130,9 @@ func (r *ReuseHarvester) OnMessage(message ReuseMessage) error {

//Stop: 停止harvester
func (r *ReuseHarvester) Stop() {
close(r.done)
r.closeOnce.Do(func() {
close(r.done)
})
}

//HasState
Expand Down Expand Up @@ -373,6 +376,15 @@ func (h *FileHarvester) Run() {
// read file
h.readerDone.Add(1)
go h.loopRead()
} else {
for _, reuseReader := range h.forwarders {
select {
case <-reuseReader.done:
logp.Info("forwarder is done, delete forwarder(%s)", reuseReader.HarvesterID)
delete(h.forwarders, reuseReader.HarvesterID)
default:
}
}
}

if len(h.forwarders) > 0 {
Expand Down

0 comments on commit 47eede5

Please sign in to comment.