From 0601a02d7931dd997bb0d0f6765adad71a7e8d9e Mon Sep 17 00:00:00 2001 From: jayjiahua <553544693@qq.com> Date: Fri, 23 Jun 2023 21:39:49 +0800 Subject: [PATCH 1/3] feat: add Removed method for log file obj --- filebeat/input/log/file.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/filebeat/input/log/file.go b/filebeat/input/log/file.go index 8c513e0e9d8..59d408c87ad 100644 --- a/filebeat/input/log/file.go +++ b/filebeat/input/log/file.go @@ -17,7 +17,11 @@ package log -import "os" +import ( + "os" + + "github.com/elastic/beats/libbeat/common/file" +) type File struct { *os.File @@ -25,3 +29,4 @@ type File struct { func (File) Continuable() bool { return true } func (File) HasState() bool { return true } +func (f File) Removed() bool { return file.IsRemoved(f.File) } From 6971699cd05e0d209e48b8108fbad50ab188d221 Mon Sep 17 00:00:00 2001 From: jayjiahua <553544693@qq.com> Date: Fri, 23 Jun 2023 22:08:02 +0800 Subject: [PATCH 2/3] feat: add Removed method for Source interface --- filebeat/harvester/source.go | 1 + filebeat/input/log/log.go | 5 +--- filebeat/input/log/stdin.go | 1 + libbeat/common/file/file_other.go | 12 +++++++++ libbeat/common/file/file_windows.go | 39 +++++++++++++++++++++++++++++ 5 files changed, 54 insertions(+), 4 deletions(-) diff --git a/filebeat/harvester/source.go b/filebeat/harvester/source.go index 6c739ccacce..f7e3c0c6405 100644 --- a/filebeat/harvester/source.go +++ b/filebeat/harvester/source.go @@ -25,6 +25,7 @@ import ( type Source interface { io.ReadCloser Name() string + Removed() bool // check if source has been removed Stat() (os.FileInfo, error) Continuable() bool // can we continue processing after EOF? HasState() bool // does this source have a state? diff --git a/filebeat/input/log/log.go b/filebeat/input/log/log.go index ba13d91bbda..d0c14c45b81 100644 --- a/filebeat/input/log/log.go +++ b/filebeat/input/log/log.go @@ -152,10 +152,7 @@ func (f *Log) errorChecks(err error) error { if f.config.CloseRemoved { // Check if the file name exists. See https://github.com/elastic/filebeat/issues/93 - _, statErr := os.Stat(f.fs.Name()) - - // Error means file does not exist. - if statErr != nil { + if f.fs.Removed() { return ErrRemoved } } diff --git a/filebeat/input/log/stdin.go b/filebeat/input/log/stdin.go index 9e3d2998841..b511c1ae517 100644 --- a/filebeat/input/log/stdin.go +++ b/filebeat/input/log/stdin.go @@ -44,3 +44,4 @@ func (p Pipe) Name() string { return p.File.Name() } func (p Pipe) Stat() (os.FileInfo, error) { return p.File.Stat() } func (p Pipe) Continuable() bool { return false } func (p Pipe) HasState() bool { return false } +func (p Pipe) Removed() bool { return false } diff --git a/libbeat/common/file/file_other.go b/libbeat/common/file/file_other.go index 1ac4ecb2cd4..271223b2a8b 100644 --- a/libbeat/common/file/file_other.go +++ b/libbeat/common/file/file_other.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +//go:build !windows // +build !windows package file @@ -62,3 +63,14 @@ func ReadOpen(path string) (*os.File, error) { perm := os.FileMode(0) return os.OpenFile(path, flag, perm) } + +// IsRemoved checks wheter the file held by f is removed. +func IsRemoved(f *os.File) bool { + stat, err := f.Stat() + if err != nil { + // if we got an error from a Stat call just assume we are removed + return true + } + sysStat := stat.Sys().(*syscall.Stat_t) + return sysStat.Nlink == 0 +} diff --git a/libbeat/common/file/file_windows.go b/libbeat/common/file/file_windows.go index 6572fa78862..c5a4d6ef364 100644 --- a/libbeat/common/file/file_windows.go +++ b/libbeat/common/file/file_windows.go @@ -23,6 +23,9 @@ import ( "reflect" "strconv" "syscall" + "unsafe" + + "golang.org/x/sys/windows" ) type StateOS struct { @@ -31,6 +34,12 @@ type StateOS struct { Vol uint64 `json:"vol,"` } +var ( + modkernel32 = windows.NewLazySystemDLL("kernel32.dll") + + procGetFileInformationByHandleEx = modkernel32.NewProc("GetFileInformationByHandleEx") +) + // GetOSState returns the platform specific StateOS func GetOSState(info os.FileInfo) StateOS { // os.SameFile must be called to populate the id fields. Otherwise in case for example @@ -107,3 +116,33 @@ func ReadOpen(path string) (*os.File, error) { return os.NewFile(uintptr(handle), path), nil } + +// IsRemoved checks wheter the file held by f is removed. +// On Windows IsRemoved reads the DeletePending flags using the GetFileInformationByHandleEx. +// A file is not removed/unlinked as long as at least one process still own a +// file handle. A delete file is only marked as deleted, and file attributes +// can still be read. Only opening a file marked with 'DeletePending' will +// fail. +func IsRemoved(f *os.File) bool { + hdl := f.Fd() + if hdl == uintptr(syscall.InvalidHandle) { + return false + } + + info := struct { + AllocationSize int64 + EndOfFile int64 + NumberOfLinks int32 + DeletePending bool + Directory bool + }{} + infoSz := unsafe.Sizeof(info) + + const class = 1 // FileStandardInfo + r1, _, _ := syscall.Syscall6( + procGetFileInformationByHandleEx.Addr(), 4, uintptr(hdl), class, uintptr(unsafe.Pointer(&info)), infoSz, 0, 0) + if r1 == 0 { + return true // assume file is removed if syscall errors + } + return info.DeletePending +} From 77b7134a2f9c6af6fdc6a9bc8fcd6ebc3735c1ab Mon Sep 17 00:00:00 2001 From: jayjiahua <553544693@qq.com> Date: Wed, 28 Jun 2023 11:16:42 +0800 Subject: [PATCH 3/3] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E6=97=81?= =?UTF-8?q?=E8=B7=AF=E6=A3=80=E6=B5=8B=E6=96=87=E4=BB=B6=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E5=B9=B6=E5=8F=8A=E6=97=B6=E9=87=8A=E6=94=BEfd?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- filebeat/input/log/harvester.go | 46 ++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 585496d0776..10b9fad1634 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -18,14 +18,15 @@ // Package log harvests different inputs for new information. Currently // two harvester types exist: // -// * log -// * stdin +// - log // -// The log harvester reads a file line by line. In case the end of a file is found -// with an incomplete line, the line pointer stays at the beginning of the incomplete -// line. As soon as the line is completed, it is read and returned. +// - stdin // -// The stdin harvesters reads data from stdin. +// The log harvester reads a file line by line. In case the end of a file is found +// with an incomplete line, the line pointer stays at the beginning of the incomplete +// line. As soon as the line is completed, it is read and returned. +// +// The stdin harvesters reads data from stdin. package log import ( @@ -198,12 +199,33 @@ func (h *Harvester) Run() error { closeTimeout = time.After(h.config.CloseTimeout) } - select { - // Applies when timeout is reached - case <-closeTimeout: - logp.Info("Closing harvester because close_timeout was reached: %s", source) - // Required when reader loop returns and reader finished - case <-h.done: + removedCheckTick := make(<-chan time.Time) + if h.config.CloseRemoved { + removedCheckTick = time.After(h.config.ScanFrequency) + } + + L: + for { + select { + // Applies when timeout is reached + case <-closeTimeout: + logp.Info("Closing harvester because close_timeout was reached: %s", source) + break L + // Check whether file is removed + case <-removedCheckTick: + // 通过旁路判断文件是否被删除,避免输出堵塞时未能执行 errorChecks 导致已删文件的 fd 没有被及时释放的问题 + if h.reader.fileReader.log.fs.Removed() { + logp.Info("Closing harvester because file was removed: %s", source) + break L + } else { + logp.Debug("harvester", "File was not removed: %s, check again after %v", source, h.config.ScanFrequency) + // update timer + removedCheckTick = time.After(h.config.ScanFrequency) + } + // Required when reader loop returns and reader finished + case <-h.done: + break L + } } h.stop()