From 135fd8d35a2f0520cc11542e3da2b5914a3571d1 Mon Sep 17 00:00:00 2001 From: thediveo Date: Fri, 15 Dec 2023 13:28:40 +0100 Subject: [PATCH 01/15] fix: pipe breakage test Signed-off-by: thediveo --- pipe/checker_notwin_test.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pipe/checker_notwin_test.go b/pipe/checker_notwin_test.go index e9718af..7822319 100644 --- a/pipe/checker_notwin_test.go +++ b/pipe/checker_notwin_test.go @@ -15,17 +15,22 @@ import ( var _ = Describe("pipes", func() { - It("detects when a pipe breaks", func() { + It("detects on the write end when a pipe breaks", func() { + // As Wireshark uses a named pipe it passes an extcap its name (path) + // and then expects the extcap to open this named pipe for writing + // packet capture data into it. For this test we simulate Wireshark + // closing its reading end and we must properly detect this situation on + // our writing end of the pipe. r, w := Successful2R(os.Pipe()) - defer r.Close() + defer w.Close() go func() { GinkgoHelper() time.Sleep(2 * time.Second) - Expect(w.Close()).To(Succeed()) + Expect(r.Close()).To(Succeed()) }() start := time.Now() - WaitTillBreak(r) - Expect(time.Since(start).Milliseconds()).To(BeNumerically(">", 1500)) + WaitTillBreak(w) + Expect(time.Since(start).Milliseconds()).To(BeNumerically(">", 1900)) }) }) From 7788cb5809e9796136032f7c3ef3a2cc909aef08 Mon Sep 17 00:00:00 2001 From: thediveo Date: Fri, 15 Dec 2023 14:39:30 +0100 Subject: [PATCH 02/15] wip: reproduce macos pipe readiness problem in unit test Signed-off-by: thediveo --- pipe/checker_notwin_test.go | 45 +++++++++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/pipe/checker_notwin_test.go b/pipe/checker_notwin_test.go index 7822319..f48f171 100644 --- a/pipe/checker_notwin_test.go +++ b/pipe/checker_notwin_test.go @@ -5,12 +5,14 @@ package pipe import ( + "io" "os" "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" . "github.com/thediveo/success" + "golang.org/x/sys/unix" ) var _ = Describe("pipes", func() { @@ -21,13 +23,52 @@ var _ = Describe("pipes", func() { // packet capture data into it. For this test we simulate Wireshark // closing its reading end and we must properly detect this situation on // our writing end of the pipe. - r, w := Successful2R(os.Pipe()) + By("creating a temporary named pipe/fifo and opening its ends") + tmpfifodir := Successful(os.MkdirTemp("", "test-fifo-*")) + defer os.RemoveAll(tmpfifodir) + + fifoname := tmpfifodir + "/fifo" + unix.Mkfifo(fifoname, 0660) + wch := make(chan *os.File) + go func() { + defer GinkgoRecover() + wch <- Successful(os.OpenFile(fifoname, os.O_WRONLY, os.ModeNamedPipe)) + }() + + rch := make(chan *os.File) + go func() { + defer GinkgoRecover() + rch <- Successful(os.OpenFile(fifoname, os.O_RDONLY, os.ModeNamedPipe)) + }() + + var r, w *os.File + Eventually(rch).Should(Receive(&r)) + Eventually(wch).Should(Receive(&w)) defer w.Close() + go func() { - GinkgoHelper() + defer GinkgoRecover() + By("continously draining the read end of the pipe into /dev/null") + null := Successful(os.OpenFile("/dev/null", os.O_WRONLY, 0)) + defer null.Close() + io.Copy(null, r) + By("pipe draining done") + }() + + go func() { + defer GinkgoRecover() time.Sleep(2 * time.Second) + By("closing read end of pipe") Expect(r.Close()).To(Succeed()) }() + + go func() { + defer GinkgoRecover() + time.Sleep(300 * time.Microsecond) + By("writing some data into the pipe") + w.WriteString("Wireshark rulez") + }() + start := time.Now() WaitTillBreak(w) Expect(time.Since(start).Milliseconds()).To(BeNumerically(">", 1900)) From d0160e275327c2aa05b6179ebc4f5fa0a96ad442 Mon Sep 17 00:00:00 2001 From: thediveo Date: Fri, 15 Dec 2023 15:09:40 +0100 Subject: [PATCH 03/15] test: improve reporting Signed-off-by: thediveo --- pipe/checker_notwin_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pipe/checker_notwin_test.go b/pipe/checker_notwin_test.go index f48f171..64eed2a 100644 --- a/pipe/checker_notwin_test.go +++ b/pipe/checker_notwin_test.go @@ -69,9 +69,11 @@ var _ = Describe("pipes", func() { w.WriteString("Wireshark rulez") }() + By("waiting for pipe to break") start := time.Now() WaitTillBreak(w) - Expect(time.Since(start).Milliseconds()).To(BeNumerically(">", 1900)) + Expect(time.Since(start).Milliseconds()).To( + BeNumerically(">", 1900), "pipe wasn't broken yet") }) }) From 2f45f96748e835f0fef4cf429ca27f92a6c60a33 Mon Sep 17 00:00:00 2001 From: thediveo Date: Fri, 15 Dec 2023 16:25:33 +0100 Subject: [PATCH 04/15] refactor: pipe breakage checking Signed-off-by: thediveo --- pipe/checker_notwin.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/pipe/checker_notwin.go b/pipe/checker_notwin.go index fd30f02..48959d2 100644 --- a/pipe/checker_notwin.go +++ b/pipe/checker_notwin.go @@ -21,18 +21,24 @@ import ( func WaitTillBreak(fifo *os.File) { log.Debug("constantly monitoring packet capture fifo status...") fds := unix.FdSet{} + fifoFdNo := int(fifo.Fd()) for { // Check the fifo becomming readable, which signals that it has been // closed. In this case, ex-termi-nate ;) Oh, and remember to correctly // initialize the fdset each time before calling select() ... well, just // because that's a good idea to do. :( - fds.Set(int(fifo.Fd())) - n, err := unix.Select( - int(fifo.Fd())+1, // highest fd is our file descriptor. - &fds, nil, nil, // only watch readable. + fds.Zero() + fds.Set(fifoFdNo) + _, err := unix.Select( + fifoFdNo+1, // highest fd is our file descriptor. + &fds, nil, nil, // only watch readable. nil, // no timeout, ever. ) - if n != 0 || err != nil { + if err != nil { + log.Debugf("capture fifo broken, reason: %s", err.Error()) + return + } + if fds.IsSet(fifoFdNo) { // Either the pipe was broken by Wireshark, or we did break it on // purpose in the piping process. Anyway, we're done. log.Debug("capture fifo broken, stopped monitoring.") From 7fa651f2212f61318f13e15996e5616a00eaa799 Mon Sep 17 00:00:00 2001 From: thediveo Date: Fri, 15 Dec 2023 16:57:45 +0100 Subject: [PATCH 05/15] wip: refactor to use poll Signed-off-by: thediveo --- pipe/checker_notwin.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/pipe/checker_notwin.go b/pipe/checker_notwin.go index 48959d2..c82759e 100644 --- a/pipe/checker_notwin.go +++ b/pipe/checker_notwin.go @@ -20,25 +20,30 @@ import ( // This implementation leverages [syscall.Select]. func WaitTillBreak(fifo *os.File) { log.Debug("constantly monitoring packet capture fifo status...") - fds := unix.FdSet{} - fifoFdNo := int(fifo.Fd()) + fds := []unix.PollFd{ + { + Fd: int32(fifo.Fd()), + Events: unix.POLLIN + unix.POLLERR, + }, + } for { // Check the fifo becomming readable, which signals that it has been // closed. In this case, ex-termi-nate ;) Oh, and remember to correctly // initialize the fdset each time before calling select() ... well, just // because that's a good idea to do. :( - fds.Zero() - fds.Set(fifoFdNo) - _, err := unix.Select( - fifoFdNo+1, // highest fd is our file descriptor. - &fds, nil, nil, // only watch readable. - nil, // no timeout, ever. - ) + n, err := unix.Poll(fds, 1000 /*ms*/) if err != nil { + if err == unix.EINTR { + continue + } log.Debugf("capture fifo broken, reason: %s", err.Error()) return } - if fds.IsSet(fifoFdNo) { + if n <= 0 { + continue + } + log.Debugf("poll: %+v", fds) + if fds[0].Revents&unix.POLLERR != 0 { // Either the pipe was broken by Wireshark, or we did break it on // purpose in the piping process. Anyway, we're done. log.Debug("capture fifo broken, stopped monitoring.") From 34f7938d83f0f76fc1fd46b8748b6261f606ef29 Mon Sep 17 00:00:00 2001 From: thediveo Date: Mon, 18 Dec 2023 09:47:30 +0100 Subject: [PATCH 06/15] refactor: remove unnecessary os.ModeNamedPipe Signed-off-by: thediveo --- capturestream.go | 2 +- pipe/checker_notwin_test.go | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/capturestream.go b/capturestream.go index dd1ecfb..b7ccbc3 100644 --- a/capturestream.go +++ b/capturestream.go @@ -32,7 +32,7 @@ import ( func Capture(st csharg.SharkTank) int { // Open packet stream pipe to Wireshark to feed it jucy packets... log.Debugf("fifo to Wireshark %s", wireshark.FifoPath) - fifo, err := os.OpenFile(wireshark.FifoPath, os.O_WRONLY, os.ModeNamedPipe) + fifo, err := os.OpenFile(wireshark.FifoPath, os.O_WRONLY, 0) if err != nil { log.Errorf("cannot open fifo: %s", err.Error()) return 1 diff --git a/pipe/checker_notwin_test.go b/pipe/checker_notwin_test.go index 64eed2a..c2c0ac8 100644 --- a/pipe/checker_notwin_test.go +++ b/pipe/checker_notwin_test.go @@ -9,10 +9,11 @@ import ( "os" "time" + "golang.org/x/sys/unix" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" . "github.com/thediveo/success" - "golang.org/x/sys/unix" ) var _ = Describe("pipes", func() { @@ -32,13 +33,13 @@ var _ = Describe("pipes", func() { wch := make(chan *os.File) go func() { defer GinkgoRecover() - wch <- Successful(os.OpenFile(fifoname, os.O_WRONLY, os.ModeNamedPipe)) + wch <- Successful(os.OpenFile(fifoname, os.O_WRONLY, 0)) }() rch := make(chan *os.File) go func() { defer GinkgoRecover() - rch <- Successful(os.OpenFile(fifoname, os.O_RDONLY, os.ModeNamedPipe)) + rch <- Successful(os.OpenFile(fifoname, os.O_RDONLY, 0)) }() var r, w *os.File @@ -73,7 +74,7 @@ var _ = Describe("pipes", func() { start := time.Now() WaitTillBreak(w) Expect(time.Since(start).Milliseconds()).To( - BeNumerically(">", 1900), "pipe wasn't broken yet") + BeNumerically(">", 1900), "false positive: pipe wasn't broken yet") }) }) From 8d0280c6f34a71be68794c8baf88a0eaa4ee7c04 Mon Sep 17 00:00:00 2001 From: thediveo Date: Mon, 18 Dec 2023 10:27:24 +0100 Subject: [PATCH 07/15] refactor: add context to pipe break detection; use timeout in pipe break unit test Signed-off-by: thediveo --- capturestream.go | 3 ++- pipe/checker_notwin.go | 21 ++++++++++++++------- pipe/checker_notwin_test.go | 17 +++++++++++++++-- 3 files changed, 31 insertions(+), 10 deletions(-) diff --git a/capturestream.go b/capturestream.go index b7ccbc3..8ee46f6 100644 --- a/capturestream.go +++ b/capturestream.go @@ -13,6 +13,7 @@ package cshargextcap import ( + "context" "os" "strings" @@ -70,7 +71,7 @@ func Capture(st csharg.SharkTank) int { // might be idle for long times and thus we would otherwise not notice that // Wireshark has already stopped capturing. go func() { - pipe.WaitTillBreak(fifo) + pipe.WaitTillBreak(context.Background(), fifo) cs.Stop() }() // ...and finally wait for the packet capture to terminate (or getting diff --git a/pipe/checker_notwin.go b/pipe/checker_notwin.go index c82759e..16e615b 100644 --- a/pipe/checker_notwin.go +++ b/pipe/checker_notwin.go @@ -7,6 +7,7 @@ package pipe import ( + "context" "os" "golang.org/x/sys/unix" @@ -14,16 +15,17 @@ import ( log "github.com/sirupsen/logrus" ) -// WaitTillBreak continuously checks a fifo/pipe to see when it breaks. When -// called, WaitTillBreak blocks until the fifo/pipe finally has broken. +// WaitTillBreak continuously checks a fifo/pipe's producer end (writing end) to +// see when it breaks. When called, WaitTillBreak blocks until the fifo/pipe +// finally has broken. It also returns when the passed context is done. // -// This implementation leverages [syscall.Select]. -func WaitTillBreak(fifo *os.File) { +// This implementation leverages [unix.Poll]. +func WaitTillBreak(ctx context.Context, fifo *os.File) { log.Debug("constantly monitoring packet capture fifo status...") fds := []unix.PollFd{ { Fd: int32(fifo.Fd()), - Events: unix.POLLIN + unix.POLLERR, + Events: 0, // we're interested only in POLLERR and that is ignored here anyway. }, } for { @@ -31,7 +33,13 @@ func WaitTillBreak(fifo *os.File) { // closed. In this case, ex-termi-nate ;) Oh, and remember to correctly // initialize the fdset each time before calling select() ... well, just // because that's a good idea to do. :( - n, err := unix.Poll(fds, 1000 /*ms*/) + n, err := unix.Poll(fds, 100 /* ms */) + select { + case <-ctx.Done(): + log.Debug("context done while monitoring packet capture fifo") + return + default: + } if err != nil { if err == unix.EINTR { continue @@ -42,7 +50,6 @@ func WaitTillBreak(fifo *os.File) { if n <= 0 { continue } - log.Debugf("poll: %+v", fds) if fds[0].Revents&unix.POLLERR != 0 { // Either the pipe was broken by Wireshark, or we did break it on // purpose in the piping process. Anyway, we're done. diff --git a/pipe/checker_notwin_test.go b/pipe/checker_notwin_test.go index c2c0ac8..6a522b9 100644 --- a/pipe/checker_notwin_test.go +++ b/pipe/checker_notwin_test.go @@ -5,6 +5,7 @@ package pipe import ( + "context" "io" "os" "time" @@ -13,12 +14,21 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + . "github.com/onsi/gomega/gleak" . "github.com/thediveo/success" ) var _ = Describe("pipes", func() { - It("detects on the write end when a pipe breaks", func() { + BeforeEach(func() { + goodgos := Goroutines() + DeferCleanup(func() { + Eventually(Goroutines).Within(2 * time.Second).ProbeEvery(100 * time.Millisecond). + ShouldNot(HaveLeaked(goodgos)) + }) + }) + + It("detects on the write end when a pipe breaks", func(ctx context.Context) { // As Wireshark uses a named pipe it passes an extcap its name (path) // and then expects the extcap to open this named pipe for writing // packet capture data into it. For this test we simulate Wireshark @@ -71,8 +81,11 @@ var _ = Describe("pipes", func() { }() By("waiting for pipe to break") + ctx, cancel := context.WithTimeout(ctx, 4*time.Second) + defer cancel() start := time.Now() - WaitTillBreak(w) + WaitTillBreak(ctx, w) + Expect(ctx.Err()).To(BeNil(), "break detection failed") Expect(time.Since(start).Milliseconds()).To( BeNumerically(">", 1900), "false positive: pipe wasn't broken yet") }) From 27241eb4f6cf93924c645172d6923acf19a82ef5 Mon Sep 17 00:00:00 2001 From: thediveo Date: Wed, 20 Dec 2023 13:14:16 +0100 Subject: [PATCH 08/15] refactor: poll-based pipe monitoring Signed-off-by: thediveo --- pipe/checker_notwin.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/pipe/checker_notwin.go b/pipe/checker_notwin.go index 16e615b..5afdd43 100644 --- a/pipe/checker_notwin.go +++ b/pipe/checker_notwin.go @@ -22,24 +22,29 @@ import ( // This implementation leverages [unix.Poll]. func WaitTillBreak(ctx context.Context, fifo *os.File) { log.Debug("constantly monitoring packet capture fifo status...") - fds := []unix.PollFd{ - { - Fd: int32(fifo.Fd()), - Events: 0, // we're interested only in POLLERR and that is ignored here anyway. - }, - } for { - // Check the fifo becomming readable, which signals that it has been - // closed. In this case, ex-termi-nate ;) Oh, and remember to correctly - // initialize the fdset each time before calling select() ... well, just - // because that's a good idea to do. :( - n, err := unix.Poll(fds, 100 /* ms */) select { case <-ctx.Done(): log.Debug("context done while monitoring packet capture fifo") return default: } + // Check the fifo becomming readable, which signals that it has been + // closed. In this case, ex-termi-nate ;) Oh, and remember to correctly + // initialize the fdset each time before calling select() ... well, just + // because that's a good idea to do. :( + fd := fifo.Fd() // n.b. a closed *os.File returns a -1 fd. + if fd == ^uintptr(0) { + log.Debug("stopping packet capture fifo monitoring, as write end has been closed") + return + } + fds := []unix.PollFd{ + { + Fd: int32(fd), + Events: 0, // we're interested only in POLLERR and that is ignored here anyway. + }, + } + n, err := unix.Poll(fds, 100 /* ms */) if err != nil { if err == unix.EINTR { continue From 54679249c0b8a3f166d23b22c8e8e643e2592e6a Mon Sep 17 00:00:00 2001 From: thediveo Date: Wed, 20 Dec 2023 14:25:58 +0100 Subject: [PATCH 09/15] fix: add context support to pipe checker on Windows Signed-off-by: thediveo --- pipe/checker_windows.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pipe/checker_windows.go b/pipe/checker_windows.go index 5d76b37..78508a2 100644 --- a/pipe/checker_windows.go +++ b/pipe/checker_windows.go @@ -7,6 +7,7 @@ package pipe import ( + "context" "os" "syscall" "time" @@ -23,12 +24,15 @@ import ( // unsynchronized concurrent writes are a really bad idea, but in this case // we're not really writing anything, but just poking things to see if they're // dead already. -func WaitTillBreak(fifo *os.File) { +func WaitTillBreak(ctx context.Context, fifo *os.File) { log.Debug("constantly monitoring packet capture fifo status...") nothing := []byte{} - ticker := time.NewTicker(1 * time.Second) + ticker := time.NewTicker(100 * time.Millisecond) for { select { + case <-ctx.Done(): + log.Debug("context done while monitoring packet capture fifo") + return case <-ticker.C: // Avoid the usual higher level writes, because of their // optimizations. While at this time the Windows writer From cc2f01bec35cf2ed5a4d87e65073598fe8d485bc Mon Sep 17 00:00:00 2001 From: thediveo Date: Wed, 20 Dec 2023 14:26:35 +0100 Subject: [PATCH 10/15] ci: build darwin AMD64 binary Signed-off-by: thediveo --- .goreleaser.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 2ae315d..53f05e2 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -3,7 +3,6 @@ dist: ./dist before: hooks: - #- go mod tidy - go generate . - ./scripts/go-winres.sh make --in winres.json --product-version=git-tag --file-version=git-tag --arch amd64 @@ -14,12 +13,13 @@ builds: targets: - linux_amd64_v1 - linux_arm64 + - darwin_amd64 - darwin_arm64 tags: - netgo - osusergo ldflags: - - 's -w -X main.version={{.Version}} -X main.commit={{.Commit}} -X main.date={{.Date}} -X main.builtBy=goreleaser' + - '-s -w -X main.version={{.Version}} -X main.commit={{.Commit}} -X main.date={{.Date}} -X main.builtBy=goreleaser' - id: windows main: ./cmd/cshargextcap From ee7ba92694a73d2c2003502fae7e36cf7e759a12 Mon Sep 17 00:00:00 2001 From: thediveo Date: Wed, 20 Dec 2023 14:55:56 +0100 Subject: [PATCH 11/15] chore: update readme to mention release for macos on amd64 Signed-off-by: thediveo --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f6f88ab..2b9fca2 100644 --- a/README.md +++ b/README.md @@ -66,8 +66,8 @@ Confirm and we're live capturing. - **Mac OS**: head over to our [releases](https://github.com/siemens/cshargextcap/releases/latest) page and - download the `.tar.gz` archive for Darwin arm64. Extract the contained - `cshargextcap` plugin binary and copy/move it to + download the `.tar.gz` archive for Darwin/macos arm64 or amd64 (Intel). + Extract the contained `cshargextcap` plugin binary and copy/move it to `/Applications/Wireshark.app/Contents/MacOS/extcap`. - **Windows**: head over to our From f4c8c567a8c79f9d8b63a0551aafa91d6fc4d4dc Mon Sep 17 00:00:00 2001 From: thediveo Date: Sat, 23 Dec 2023 15:03:50 +0100 Subject: [PATCH 12/15] refactor: pipe unit test, pipe check usage Signed-off-by: thediveo --- capturestream.go | 8 ++++++-- pipe/checker_notwin.go | 26 ++++++++++++-------------- pipe/checker_notwin_test.go | 21 +++++++++++++-------- pipe/package_test.go | 1 + 4 files changed, 32 insertions(+), 24 deletions(-) diff --git a/capturestream.go b/capturestream.go index 8ee46f6..b6c5ba8 100644 --- a/capturestream.go +++ b/capturestream.go @@ -64,14 +64,18 @@ func Capture(st csharg.SharkTank) int { log.Errorf("cannot start capture: %s", err.Error()) return 1 } - defer cs.Stop() // be overly careful + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + cs.Stop() // be overly careful + }() // Always keep an eye on the fifo getting closed by Wireshark: we then need // to stop the capture stream. This is necessary because the capture stream // might be idle for long times and thus we would otherwise not notice that // Wireshark has already stopped capturing. go func() { - pipe.WaitTillBreak(context.Background(), fifo) + pipe.WaitTillBreak(ctx, fifo) cs.Stop() }() // ...and finally wait for the packet capture to terminate (or getting diff --git a/pipe/checker_notwin.go b/pipe/checker_notwin.go index 5afdd43..6652e96 100644 --- a/pipe/checker_notwin.go +++ b/pipe/checker_notwin.go @@ -16,12 +16,19 @@ import ( ) // WaitTillBreak continuously checks a fifo/pipe's producer end (writing end) to -// see when it breaks. When called, WaitTillBreak blocks until the fifo/pipe -// finally has broken. It also returns when the passed context is done. +// see when it breaks. When called, WaitTillBreak blocks until the fifo/named +// pipe finally has “broken”; that is, the reading end has been closed. +// WaitTillBreak also returns when the passed context is done. // // This implementation leverages [unix.Poll]. func WaitTillBreak(ctx context.Context, fifo *os.File) { log.Debug("constantly monitoring packet capture fifo status...") + fifofd, err := unix.Dup(int(fifo.Fd())) + if err != nil { + log.Debugf("cannot duplicate packet capture fifo file descriptor, reason: %s", err.Error()) + return + } + defer unix.Close(fifofd) for { select { case <-ctx.Done(): @@ -29,19 +36,10 @@ func WaitTillBreak(ctx context.Context, fifo *os.File) { return default: } - // Check the fifo becomming readable, which signals that it has been - // closed. In this case, ex-termi-nate ;) Oh, and remember to correctly - // initialize the fdset each time before calling select() ... well, just - // because that's a good idea to do. :( - fd := fifo.Fd() // n.b. a closed *os.File returns a -1 fd. - if fd == ^uintptr(0) { - log.Debug("stopping packet capture fifo monitoring, as write end has been closed") - return - } fds := []unix.PollFd{ { - Fd: int32(fd), - Events: 0, // we're interested only in POLLERR and that is ignored here anyway. + Fd: int32(fifofd), + Events: unix.POLLHUP, // we're interested only in POLLERR and that is ignored on input anyway. }, } n, err := unix.Poll(fds, 100 /* ms */) @@ -49,7 +47,7 @@ func WaitTillBreak(ctx context.Context, fifo *os.File) { if err == unix.EINTR { continue } - log.Debugf("capture fifo broken, reason: %s", err.Error()) + log.Debugf("pipe polling failed, reason: %s", err.Error()) return } if n <= 0 { diff --git a/pipe/checker_notwin_test.go b/pipe/checker_notwin_test.go index 6a522b9..9b13c65 100644 --- a/pipe/checker_notwin_test.go +++ b/pipe/checker_notwin_test.go @@ -39,17 +39,22 @@ var _ = Describe("pipes", func() { defer os.RemoveAll(tmpfifodir) fifoname := tmpfifodir + "/fifo" - unix.Mkfifo(fifoname, 0660) - wch := make(chan *os.File) + Expect(unix.Mkfifo(fifoname, 0600)).To(Succeed()) + + // Open both ends of the named pipe, once for reading and once for + // writing. As this is a rendevouz operation, we run the two open + // operations concurrently and proceed after we've succeeded on both + // ends. + rch := make(chan *os.File) go func() { defer GinkgoRecover() - wch <- Successful(os.OpenFile(fifoname, os.O_WRONLY, 0)) + rch <- Successful(os.OpenFile(fifoname, os.O_RDONLY, 0)) }() - rch := make(chan *os.File) + wch := make(chan *os.File) go func() { defer GinkgoRecover() - rch <- Successful(os.OpenFile(fifoname, os.O_RDONLY, 0)) + wch <- Successful(os.OpenFile(fifoname, os.O_WRONLY, 0)) }() var r, w *os.File @@ -59,11 +64,11 @@ var _ = Describe("pipes", func() { go func() { defer GinkgoRecover() - By("continously draining the read end of the pipe into /dev/null") + By("continously draining the read end of the pipe into /dev/null...") null := Successful(os.OpenFile("/dev/null", os.O_WRONLY, 0)) defer null.Close() io.Copy(null, r) - By("pipe draining done") + By("...pipe draining done") }() go func() { @@ -75,7 +80,7 @@ var _ = Describe("pipes", func() { go func() { defer GinkgoRecover() - time.Sleep(300 * time.Microsecond) + time.Sleep(500 * time.Microsecond) By("writing some data into the pipe") w.WriteString("Wireshark rulez") }() diff --git a/pipe/package_test.go b/pipe/package_test.go index eaa254f..d52821f 100644 --- a/pipe/package_test.go +++ b/pipe/package_test.go @@ -14,6 +14,7 @@ import ( ) func TestContainerSharkExtCap(t *testing.T) { + log.SetOutput(GinkgoWriter) log.SetLevel(log.DebugLevel) RegisterFailHandler(Fail) From 2a3c7337a21e8105f5fe6d945489863030ba7f10 Mon Sep 17 00:00:00 2001 From: thediveo Date: Sat, 23 Dec 2023 16:10:29 +0100 Subject: [PATCH 13/15] refactor: handle SIGINT/SIGTERM Signed-off-by: thediveo --- .gitignore | 2 ++ capturestream.go | 50 ++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index fd6babc..07b6371 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ *.syso coverage.* __debug_bin +/cshargextcap +/cshargextcap.exe diff --git a/capturestream.go b/capturestream.go index b6c5ba8..8a853bc 100644 --- a/capturestream.go +++ b/capturestream.go @@ -15,24 +15,32 @@ package cshargextcap import ( "context" "os" + "os/signal" + "runtime" "strings" "github.com/siemens/csharg" "github.com/siemens/cshargextcap/cli/target" "github.com/siemens/cshargextcap/cli/wireshark" "github.com/siemens/cshargextcap/pipe" + "golang.org/x/sys/unix" log "github.com/sirupsen/logrus" ) -// Capture is the workhorse: it opens the pipe (fifo) offered by Wireshark, then -// starts a new Capture stream using the given SharkTank client and container -// target description. Then it lets csharg pump all packet Capture data arriving -// from the underlying websocket connected to the Capture service into the -// Wireshark pipe. +// Capture is the workhorse: it opens the named pipe (fifo) offered by +// Wireshark, then starts a new Capture stream using the given SharkTank client +// and container target description. Then it lets csharg pump all packet Capture +// data arriving from the underlying websocket connected to the capture service +// into the Wireshark pipe. func Capture(st csharg.SharkTank) int { + if runtime.GOOS != "windows" { + defer func() { + signal.Reset(unix.SIGINT, unix.SIGTERM) + }() + } // Open packet stream pipe to Wireshark to feed it jucy packets... - log.Debugf("fifo to Wireshark %s", wireshark.FifoPath) + log.Debugf("opening fifo to Wireshark %s", wireshark.FifoPath) fifo, err := os.OpenFile(wireshark.FifoPath, os.O_WRONLY, 0) if err != nil { log.Errorf("cannot open fifo: %s", err.Error()) @@ -65,6 +73,36 @@ func Capture(st csharg.SharkTank) int { return 1 } + // Wireshark on unix systems sends SIGINT upon stopping a capture and + // SIGTERM upon wanting to quit. We here use Debug logs as otherwise + // Wireshark will report the logging as errors to the user. We only accept + // that in case of a fatal abort when catching one of the signals twice or + // one after the other. + sigs := make(chan os.Signal, 1) + go func() { + fatal := false + for sig := range sigs { + switch sig { + case unix.SIGINT: + log.Debug("received SIGINT") + case unix.SIGTERM: + log.Debug("received SIGTERM") + } + if fatal { + // twice a signal --> immediate abort + log.Fatal("aborting") + } + fatal = true + log.Debug("shutting down capture stream") + go func() { + cs.Stop() // blocks, and is also idempotent. + }() + } + }() + if runtime.GOOS != "windows" { + signal.Notify(sigs, unix.SIGINT, unix.SIGTERM) + } + ctx, cancel := context.WithCancel(context.Background()) defer func() { cancel() From ef6e9dc7bd8e81ff7b34437a2959ef1bc77c09e1 Mon Sep 17 00:00:00 2001 From: thediveo Date: Tue, 26 Dec 2023 17:04:12 +0100 Subject: [PATCH 14/15] refactor: remove pipe close monitoring, rely solely on Wireshark sending signals Signed-off-by: thediveo --- .gitignore | 1 + .goreleaser.yaml | 2 +- Makefile | 8 ++- capturestream.go | 39 ++++++--------- pipe/checker_notwin.go | 63 ------------------------ pipe/checker_notwin_test.go | 98 ------------------------------------- pipe/checker_windows.go | 52 -------------------- pipe/doc.go | 4 -- pipe/package_test.go | 22 --------- 9 files changed, 24 insertions(+), 265 deletions(-) delete mode 100644 pipe/checker_notwin.go delete mode 100644 pipe/checker_notwin_test.go delete mode 100644 pipe/checker_windows.go delete mode 100644 pipe/doc.go delete mode 100644 pipe/package_test.go diff --git a/.gitignore b/.gitignore index 07b6371..ce14d4f 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ coverage.* __debug_bin /cshargextcap /cshargextcap.exe +*.log diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 53f05e2..e0c5b3a 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -30,7 +30,7 @@ builds: - netgo - osusergo ldflags: - - 's -w -X main.version={{.Version}} -X main.commit={{.Commit}} -X main.date={{.Date}} -X main.builtBy=goreleaser' + - '-s -w -X main.version={{.Version}} -X main.commit={{.Commit}} -X main.date={{.Date}} -X main.builtBy=goreleaser' hooks: post: - cmd: packaging/windows/post.sh {{ .Path }} diff --git a/Makefile b/Makefile index 04f1166..bb6183d 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ SHELL:=/bin/bash GOGEN:=go generate . BUILDTAGS:="osusergo,netgo" -.PHONY: help clean dist pkgsite report run vuln +.PHONY: help clean cshargextcap dist pkgsite report run vuln help: ## list available targets @# Derived from Gomega's Makefile (github.com/onsi/gomega) under MIT License @@ -21,6 +21,12 @@ dist: ## build snapshot cshargextcap binary packages+archives in dist/ and test @ls -lh dist/cshargextcap_* @echo "🏁 done" +cshargextcap: ## build local extcap binary + go build \ + -tags netgo,osusergo \ + -ldflags "-s -w" \ + ./cmd/cshargextcap + clean: ## cleans up build and testing artefacts rm -rf dist find . -name __debug_bin -delete diff --git a/capturestream.go b/capturestream.go index 8a853bc..69a0dad 100644 --- a/capturestream.go +++ b/capturestream.go @@ -13,16 +13,13 @@ package cshargextcap import ( - "context" "os" "os/signal" - "runtime" "strings" "github.com/siemens/csharg" "github.com/siemens/cshargextcap/cli/target" "github.com/siemens/cshargextcap/cli/wireshark" - "github.com/siemens/cshargextcap/pipe" "golang.org/x/sys/unix" log "github.com/sirupsen/logrus" @@ -34,11 +31,16 @@ import ( // data arriving from the underlying websocket connected to the capture service // into the Wireshark pipe. func Capture(st csharg.SharkTank) int { - if runtime.GOOS != "windows" { - defer func() { - signal.Reset(unix.SIGINT, unix.SIGTERM) - }() - } + // While Wireshark (and Tshark) currently send SIGTERM (and maybe SIGINT in + // some situations, maybe when using a control pipe which we don't) only on + // unix systems, there are developer discussions to in the future send + // events to a Windows extcap. As Go maps such events to its signal API + // we're already now unconditionally handling SIGINT and SIGTERM in the hope + // that we're future-proof. + defer func() { + signal.Reset(unix.SIGINT, unix.SIGTERM) + }() + // Open packet stream pipe to Wireshark to feed it jucy packets... log.Debugf("opening fifo to Wireshark %s", wireshark.FifoPath) fifo, err := os.OpenFile(wireshark.FifoPath, os.O_WRONLY, 0) @@ -99,23 +101,12 @@ func Capture(st csharg.SharkTank) int { }() } }() - if runtime.GOOS != "windows" { - signal.Notify(sigs, unix.SIGINT, unix.SIGTERM) - } + // As mentioned above, we unconditionally handle SIGINT and SIGTERM on all + // platforms. While this is currently not needed on Windows, some day it + // might become alive. + signal.Notify(sigs, unix.SIGINT, unix.SIGTERM) - ctx, cancel := context.WithCancel(context.Background()) - defer func() { - cancel() - cs.Stop() // be overly careful - }() - // Always keep an eye on the fifo getting closed by Wireshark: we then need - // to stop the capture stream. This is necessary because the capture stream - // might be idle for long times and thus we would otherwise not notice that - // Wireshark has already stopped capturing. - go func() { - pipe.WaitTillBreak(ctx, fifo) - cs.Stop() - }() + defer cs.Stop() // be overly careful // ...and finally wait for the packet capture to terminate (or getting // ex-term-inated). cs.Wait() diff --git a/pipe/checker_notwin.go b/pipe/checker_notwin.go deleted file mode 100644 index 6652e96..0000000 --- a/pipe/checker_notwin.go +++ /dev/null @@ -1,63 +0,0 @@ -// (c) Siemens AG 2023 -// -// SPDX-License-Identifier: MIT - -//go:build !windows - -package pipe - -import ( - "context" - "os" - - "golang.org/x/sys/unix" - - log "github.com/sirupsen/logrus" -) - -// WaitTillBreak continuously checks a fifo/pipe's producer end (writing end) to -// see when it breaks. When called, WaitTillBreak blocks until the fifo/named -// pipe finally has “broken”; that is, the reading end has been closed. -// WaitTillBreak also returns when the passed context is done. -// -// This implementation leverages [unix.Poll]. -func WaitTillBreak(ctx context.Context, fifo *os.File) { - log.Debug("constantly monitoring packet capture fifo status...") - fifofd, err := unix.Dup(int(fifo.Fd())) - if err != nil { - log.Debugf("cannot duplicate packet capture fifo file descriptor, reason: %s", err.Error()) - return - } - defer unix.Close(fifofd) - for { - select { - case <-ctx.Done(): - log.Debug("context done while monitoring packet capture fifo") - return - default: - } - fds := []unix.PollFd{ - { - Fd: int32(fifofd), - Events: unix.POLLHUP, // we're interested only in POLLERR and that is ignored on input anyway. - }, - } - n, err := unix.Poll(fds, 100 /* ms */) - if err != nil { - if err == unix.EINTR { - continue - } - log.Debugf("pipe polling failed, reason: %s", err.Error()) - return - } - if n <= 0 { - continue - } - if fds[0].Revents&unix.POLLERR != 0 { - // Either the pipe was broken by Wireshark, or we did break it on - // purpose in the piping process. Anyway, we're done. - log.Debug("capture fifo broken, stopped monitoring.") - return - } - } -} diff --git a/pipe/checker_notwin_test.go b/pipe/checker_notwin_test.go deleted file mode 100644 index 9b13c65..0000000 --- a/pipe/checker_notwin_test.go +++ /dev/null @@ -1,98 +0,0 @@ -// (c) Siemens AG 2023 -// -// SPDX-License-Identifier: MIT - -package pipe - -import ( - "context" - "io" - "os" - "time" - - "golang.org/x/sys/unix" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - . "github.com/onsi/gomega/gleak" - . "github.com/thediveo/success" -) - -var _ = Describe("pipes", func() { - - BeforeEach(func() { - goodgos := Goroutines() - DeferCleanup(func() { - Eventually(Goroutines).Within(2 * time.Second).ProbeEvery(100 * time.Millisecond). - ShouldNot(HaveLeaked(goodgos)) - }) - }) - - It("detects on the write end when a pipe breaks", func(ctx context.Context) { - // As Wireshark uses a named pipe it passes an extcap its name (path) - // and then expects the extcap to open this named pipe for writing - // packet capture data into it. For this test we simulate Wireshark - // closing its reading end and we must properly detect this situation on - // our writing end of the pipe. - By("creating a temporary named pipe/fifo and opening its ends") - tmpfifodir := Successful(os.MkdirTemp("", "test-fifo-*")) - defer os.RemoveAll(tmpfifodir) - - fifoname := tmpfifodir + "/fifo" - Expect(unix.Mkfifo(fifoname, 0600)).To(Succeed()) - - // Open both ends of the named pipe, once for reading and once for - // writing. As this is a rendevouz operation, we run the two open - // operations concurrently and proceed after we've succeeded on both - // ends. - rch := make(chan *os.File) - go func() { - defer GinkgoRecover() - rch <- Successful(os.OpenFile(fifoname, os.O_RDONLY, 0)) - }() - - wch := make(chan *os.File) - go func() { - defer GinkgoRecover() - wch <- Successful(os.OpenFile(fifoname, os.O_WRONLY, 0)) - }() - - var r, w *os.File - Eventually(rch).Should(Receive(&r)) - Eventually(wch).Should(Receive(&w)) - defer w.Close() - - go func() { - defer GinkgoRecover() - By("continously draining the read end of the pipe into /dev/null...") - null := Successful(os.OpenFile("/dev/null", os.O_WRONLY, 0)) - defer null.Close() - io.Copy(null, r) - By("...pipe draining done") - }() - - go func() { - defer GinkgoRecover() - time.Sleep(2 * time.Second) - By("closing read end of pipe") - Expect(r.Close()).To(Succeed()) - }() - - go func() { - defer GinkgoRecover() - time.Sleep(500 * time.Microsecond) - By("writing some data into the pipe") - w.WriteString("Wireshark rulez") - }() - - By("waiting for pipe to break") - ctx, cancel := context.WithTimeout(ctx, 4*time.Second) - defer cancel() - start := time.Now() - WaitTillBreak(ctx, w) - Expect(ctx.Err()).To(BeNil(), "break detection failed") - Expect(time.Since(start).Milliseconds()).To( - BeNumerically(">", 1900), "false positive: pipe wasn't broken yet") - }) - -}) diff --git a/pipe/checker_windows.go b/pipe/checker_windows.go deleted file mode 100644 index 78508a2..0000000 --- a/pipe/checker_windows.go +++ /dev/null @@ -1,52 +0,0 @@ -// (c) Siemens AG 2023 -// -// SPDX-License-Identifier: MIT - -//go:build windows - -package pipe - -import ( - "context" - "os" - "syscall" - "time" - - log "github.com/sirupsen/logrus" -) - -// WaitTillBreak continuously checks a fifo/pipe to see when it breaks. When -// called, PipeChecker blocks until the fifo/pipe finally has broken. -// -// As the Windows platform lacks a generally useful [syscall.Select] -// implementation that can also handle pipes. Instead, we will try to write 0 -// octets at regular intervals to see if the pipe is broken. Usually, -// unsynchronized concurrent writes are a really bad idea, but in this case -// we're not really writing anything, but just poking things to see if they're -// dead already. -func WaitTillBreak(ctx context.Context, fifo *os.File) { - log.Debug("constantly monitoring packet capture fifo status...") - nothing := []byte{} - ticker := time.NewTicker(100 * time.Millisecond) - for { - select { - case <-ctx.Done(): - log.Debug("context done while monitoring packet capture fifo") - return - case <-ticker.C: - // Avoid the usual higher level writes, because of their - // optimizations. While at this time the Windows writer - // seems to write even zero-length data, we cannot be sure - // this will hold for all future. So dive down into the - // syscall basement to have full control. - n, err := syscall.Write(syscall.Handle(fifo.Fd()), nothing) - if n != 0 || err != nil { - // Either the pipe was broken by Wireshark, or we - // did break it on purpose in the piping process. - // Anyway, we're done. - log.Debug("capture fifo broken, stopped monitoring.") - return - } - } - } -} diff --git a/pipe/doc.go b/pipe/doc.go deleted file mode 100644 index f099c26..0000000 --- a/pipe/doc.go +++ /dev/null @@ -1,4 +0,0 @@ -/* -Package pipe implements waiting for a fifo/pipe to break. -*/ -package pipe diff --git a/pipe/package_test.go b/pipe/package_test.go deleted file mode 100644 index d52821f..0000000 --- a/pipe/package_test.go +++ /dev/null @@ -1,22 +0,0 @@ -// (c) Siemens AG 2023 -// -// SPDX-License-Identifier: MIT - -package pipe - -import ( - "testing" - - log "github.com/sirupsen/logrus" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -func TestContainerSharkExtCap(t *testing.T) { - log.SetOutput(GinkgoWriter) - log.SetLevel(log.DebugLevel) - - RegisterFailHandler(Fail) - RunSpecs(t, "cshargextcap/pipe") -} From eafa1187a03bd76ab392576089532d2fee706b89 Mon Sep 17 00:00:00 2001 From: thediveo Date: Tue, 26 Dec 2023 17:43:51 +0100 Subject: [PATCH 15/15] fix: use syscall signal names which are defined also on Windows Signed-off-by: thediveo --- capturestream.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/capturestream.go b/capturestream.go index 69a0dad..0f9ed4f 100644 --- a/capturestream.go +++ b/capturestream.go @@ -16,11 +16,11 @@ import ( "os" "os/signal" "strings" + "syscall" "github.com/siemens/csharg" "github.com/siemens/cshargextcap/cli/target" "github.com/siemens/cshargextcap/cli/wireshark" - "golang.org/x/sys/unix" log "github.com/sirupsen/logrus" ) @@ -38,7 +38,7 @@ func Capture(st csharg.SharkTank) int { // we're already now unconditionally handling SIGINT and SIGTERM in the hope // that we're future-proof. defer func() { - signal.Reset(unix.SIGINT, unix.SIGTERM) + signal.Reset(syscall.SIGINT, syscall.SIGTERM) }() // Open packet stream pipe to Wireshark to feed it jucy packets... @@ -85,9 +85,9 @@ func Capture(st csharg.SharkTank) int { fatal := false for sig := range sigs { switch sig { - case unix.SIGINT: + case syscall.SIGINT: log.Debug("received SIGINT") - case unix.SIGTERM: + case syscall.SIGTERM: log.Debug("received SIGTERM") } if fatal { @@ -104,7 +104,7 @@ func Capture(st csharg.SharkTank) int { // As mentioned above, we unconditionally handle SIGINT and SIGTERM on all // platforms. While this is currently not needed on Windows, some day it // might become alive. - signal.Notify(sigs, unix.SIGINT, unix.SIGTERM) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) defer cs.Stop() // be overly careful // ...and finally wait for the packet capture to terminate (or getting