Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove effectless pipe close monitoring #11

Merged
merged 11 commits into from
Dec 26, 2023
4 changes: 2 additions & 2 deletions .goreleaser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ dist: ./dist

before:
hooks:
#- go mod tidy
- go generate .
- ./scripts/go-winres.sh make --in winres.json --product-version=git-tag --file-version=git-tag --arch amd64

Expand All @@ -14,12 +13,13 @@ builds:
targets:
- linux_amd64_v1
- linux_arm64
- darwin_amd64
- darwin_arm64
tags:
- netgo
- osusergo
ldflags:
- 's -w -X main.version={{.Version}} -X main.commit={{.Commit}} -X main.date={{.Date}} -X main.builtBy=goreleaser'
- '-s -w -X main.version={{.Version}} -X main.commit={{.Commit}} -X main.date={{.Date}} -X main.builtBy=goreleaser'

- id: windows
main: ./cmd/cshargextcap
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ Confirm and we're live capturing.

- **Mac OS**: head over to our
[releases](https://github.com/siemens/cshargextcap/releases/latest) page and
download the `.tar.gz` archive for Darwin arm64. Extract the contained
`cshargextcap` plugin binary and copy/move it to
download the `.tar.gz` archive for Darwin/macos arm64 or amd64 (Intel).
Extract the contained `cshargextcap` plugin binary and copy/move it to
`/Applications/Wireshark.app/Contents/MacOS/extcap`.

- **Windows**: head over to our
Expand Down
5 changes: 3 additions & 2 deletions capturestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package cshargextcap

import (
"context"
"os"
"strings"

Expand All @@ -32,7 +33,7 @@ import (
func Capture(st csharg.SharkTank) int {
// Open packet stream pipe to Wireshark to feed it jucy packets...
log.Debugf("fifo to Wireshark %s", wireshark.FifoPath)
fifo, err := os.OpenFile(wireshark.FifoPath, os.O_WRONLY, os.ModeNamedPipe)
fifo, err := os.OpenFile(wireshark.FifoPath, os.O_WRONLY, 0)
if err != nil {
log.Errorf("cannot open fifo: %s", err.Error())
return 1
Expand Down Expand Up @@ -70,7 +71,7 @@ func Capture(st csharg.SharkTank) int {
// might be idle for long times and thus we would otherwise not notice that
// Wireshark has already stopped capturing.
go func() {
pipe.WaitTillBreak(fifo)
pipe.WaitTillBreak(context.Background(), fifo)
cs.Stop()
}()
// ...and finally wait for the packet capture to terminate (or getting
Expand Down
47 changes: 35 additions & 12 deletions pipe/checker_notwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,55 @@
package pipe

import (
"context"
"os"

"golang.org/x/sys/unix"

log "github.com/sirupsen/logrus"
)

// WaitTillBreak continuously checks a fifo/pipe to see when it breaks. When
// called, WaitTillBreak blocks until the fifo/pipe finally has broken.
// WaitTillBreak continuously checks a fifo/pipe's producer end (writing end) to
// see when it breaks. When called, WaitTillBreak blocks until the fifo/pipe
// finally has broken. It also returns when the passed context is done.
//
// This implementation leverages [syscall.Select].
func WaitTillBreak(fifo *os.File) {
// This implementation leverages [unix.Poll].
func WaitTillBreak(ctx context.Context, fifo *os.File) {
log.Debug("constantly monitoring packet capture fifo status...")
fds := unix.FdSet{}
for {
select {
case <-ctx.Done():
log.Debug("context done while monitoring packet capture fifo")
return
default:
}
// Check the fifo becomming readable, which signals that it has been
// closed. In this case, ex-termi-nate ;) Oh, and remember to correctly
// initialize the fdset each time before calling select() ... well, just
// because that's a good idea to do. :(
fds.Set(int(fifo.Fd()))
n, err := unix.Select(
int(fifo.Fd())+1, // highest fd is our file descriptor.
&fds, nil, nil, // only watch readable.
nil, // no timeout, ever.
)
if n != 0 || err != nil {
fd := fifo.Fd() // n.b. a closed *os.File returns a -1 fd.
if fd == ^uintptr(0) {
log.Debug("stopping packet capture fifo monitoring, as write end has been closed")
return
}
fds := []unix.PollFd{
{
Fd: int32(fd),
Events: 0, // we're interested only in POLLERR and that is ignored here anyway.
},
}
n, err := unix.Poll(fds, 100 /* ms */)
if err != nil {
if err == unix.EINTR {
continue
}
log.Debugf("capture fifo broken, reason: %s", err.Error())
return
}
if n <= 0 {
continue
}
if fds[0].Revents&unix.POLLERR != 0 {
// Either the pipe was broken by Wireshark, or we did break it on
// purpose in the piping process. Anyway, we're done.
log.Debug("capture fifo broken, stopped monitoring.")
Expand Down
76 changes: 69 additions & 7 deletions pipe/checker_notwin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,89 @@
package pipe

import (
"context"
"io"
"os"
"time"

"golang.org/x/sys/unix"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/gleak"
. "github.com/thediveo/success"
)

var _ = Describe("pipes", func() {

It("detects when a pipe breaks", func() {
r, w := Successful2R(os.Pipe())
defer r.Close()
BeforeEach(func() {
goodgos := Goroutines()
DeferCleanup(func() {
Eventually(Goroutines).Within(2 * time.Second).ProbeEvery(100 * time.Millisecond).
ShouldNot(HaveLeaked(goodgos))
})
})

It("detects on the write end when a pipe breaks", func(ctx context.Context) {
// As Wireshark uses a named pipe it passes an extcap its name (path)
// and then expects the extcap to open this named pipe for writing
// packet capture data into it. For this test we simulate Wireshark
// closing its reading end and we must properly detect this situation on
// our writing end of the pipe.
By("creating a temporary named pipe/fifo and opening its ends")
tmpfifodir := Successful(os.MkdirTemp("", "test-fifo-*"))
defer os.RemoveAll(tmpfifodir)

fifoname := tmpfifodir + "/fifo"
unix.Mkfifo(fifoname, 0660)
wch := make(chan *os.File)
go func() {
defer GinkgoRecover()
wch <- Successful(os.OpenFile(fifoname, os.O_WRONLY, 0))
}()

rch := make(chan *os.File)
go func() {
defer GinkgoRecover()
rch <- Successful(os.OpenFile(fifoname, os.O_RDONLY, 0))
}()

var r, w *os.File
Eventually(rch).Should(Receive(&r))
Eventually(wch).Should(Receive(&w))
defer w.Close()

go func() {
defer GinkgoRecover()
By("continously draining the read end of the pipe into /dev/null")
null := Successful(os.OpenFile("/dev/null", os.O_WRONLY, 0))
defer null.Close()
io.Copy(null, r)
By("pipe draining done")
}()

go func() {
GinkgoHelper()
defer GinkgoRecover()
time.Sleep(2 * time.Second)
Expect(w.Close()).To(Succeed())
By("closing read end of pipe")
Expect(r.Close()).To(Succeed())
}()

go func() {
defer GinkgoRecover()
time.Sleep(300 * time.Microsecond)
By("writing some data into the pipe")
w.WriteString("Wireshark rulez")
}()

By("waiting for pipe to break")
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()
start := time.Now()
WaitTillBreak(r)
Expect(time.Since(start).Milliseconds()).To(BeNumerically(">", 1500))
WaitTillBreak(ctx, w)
Expect(ctx.Err()).To(BeNil(), "break detection failed")
Expect(time.Since(start).Milliseconds()).To(
BeNumerically(">", 1900), "false positive: pipe wasn't broken yet")
})

})
8 changes: 6 additions & 2 deletions pipe/checker_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package pipe

import (
"context"
"os"
"syscall"
"time"
Expand All @@ -23,12 +24,15 @@ import (
// unsynchronized concurrent writes are a really bad idea, but in this case
// we're not really writing anything, but just poking things to see if they're
// dead already.
func WaitTillBreak(fifo *os.File) {
func WaitTillBreak(ctx context.Context, fifo *os.File) {
log.Debug("constantly monitoring packet capture fifo status...")
nothing := []byte{}
ticker := time.NewTicker(1 * time.Second)
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ctx.Done():
log.Debug("context done while monitoring packet capture fifo")
return
case <-ticker.C:
// Avoid the usual higher level writes, because of their
// optimizations. While at this time the Windows writer
Expand Down
Loading