From 6ceb3ecb60b49a7879efd056f6ff2bd3ea912852 Mon Sep 17 00:00:00 2001 From: LEI WANG Date: Mon, 13 May 2024 21:29:17 +0800 Subject: [PATCH] v0.4.2: fd watcher: walk driver Signed-off-by: LEI WANG --- go.mod | 3 +- go.sum | 4 +- pkg/listener/fd/ebpf/watcher.go | 14 +++- pkg/listener/fd/event/event.go | 15 +++- pkg/listener/fd/fd.go | 32 -------- pkg/listener/fd/leak.go | 49 ------------ pkg/listener/fd/listener.go | 2 +- pkg/listener/fd/stat/stat.go | 30 +++++-- pkg/listener/fd/walk/watcher.go | 64 +++++++++------ pkg/listener/fd/watcher.go | 26 +++++- pkg/listener/fd/worker.go | 136 +++++++++++--------------------- 11 files changed, 159 insertions(+), 216 deletions(-) delete mode 100644 pkg/listener/fd/fd.go delete mode 100644 pkg/listener/fd/leak.go diff --git a/go.mod b/go.mod index 65238fd..8bf3727 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21.6 require ( github.com/ctrsploit/sploit-spec v0.4.3 github.com/fatih/color v1.15.0 - github.com/slimtoolkit/slim v0.0.0-20240505211522-04aa51412d9f + github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 github.com/ssst0n3/awesome_libs v0.6.7 github.com/urfave/cli/v2 v2.27.1 github.com/vishvananda/netlink v1.2.1-beta.2 @@ -18,6 +18,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect + github.com/stretchr/testify v1.8.4 // indirect github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae // indirect github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e // indirect golang.org/x/sys v0.15.0 // indirect diff --git a/go.sum b/go.sum index e927aa7..535f39b 100644 --- a/go.sum +++ b/go.sum @@ -26,6 +26,8 @@ github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPn github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw= +github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -35,8 +37,6 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/slimtoolkit/slim v0.0.0-20240505211522-04aa51412d9f h1:CzxuqUzdfDc7ycF4FJqL5x3Dn6Ay3+3CsDVeH0TYwRU= -github.com/slimtoolkit/slim v0.0.0-20240505211522-04aa51412d9f/go.mod h1:55lB/fY9qINNkJDRnG2qWdU1/B99MazwBBMhz0sdSnc= github.com/ssst0n3/awesome_libs v0.6.7 h1:CxhRcWy/v1THpy4Jk8AJh4jMnfYE+5iyrkfV5HL5nYQ= github.com/ssst0n3/awesome_libs v0.6.7/go.mod h1:+JQKcgjs0TWWFszGXRzIs+8pZaL0qRf6HEhPY5n8cEk= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/pkg/listener/fd/ebpf/watcher.go b/pkg/listener/fd/ebpf/watcher.go index e5219db..46d44c5 100644 --- a/pkg/listener/fd/ebpf/watcher.go +++ b/pkg/listener/fd/ebpf/watcher.go @@ -5,7 +5,8 @@ import ( ) type Watcher struct { - msg chan Message + msg chan Message + stop chan struct{} } func New(pid int) *Watcher { @@ -16,7 +17,6 @@ func New(pid int) *Watcher { func (w Watcher) Init() { w.register() - // receive IPC message go w.receive() } @@ -24,25 +24,31 @@ func (w Watcher) Enable() (enabled bool) { return } +// register to kernel func (w Watcher) register() { // register with os.GetPid() } +// receive IPC message func (w Watcher) receive() { for { w.msg <- Message{} } } -func (w Watcher) Watch(stop <-chan struct{}, event chan<- event.Events) { +func (w Watcher) Watch(event chan<- event.Events) { select { - case <-stop: + case <-w.stop: return default: w.do(event) } } +func (w Watcher) Close() { + w.stop <- struct{}{} +} + func (w Watcher) do(e chan<- event.Events) { for { msg := <-w.msg diff --git a/pkg/listener/fd/event/event.go b/pkg/listener/fd/event/event.go index ca60a25..180a9af 100644 --- a/pkg/listener/fd/event/event.go +++ b/pkg/listener/fd/event/event.go @@ -13,7 +13,20 @@ type Event struct { Type int Pid int Fd int - Stat stat.Stat + Stat *stat.Stat } type Events []Event + +func (e Events) Map(m map[int]*stat.Stat) { + for _, event := range e { + switch event.Type { + case Open, Change: + m[event.Fd] = event.Stat + case Close: + delete(m, event.Fd) + default: + panic("unhandled default case") + } + } +} diff --git a/pkg/listener/fd/fd.go b/pkg/listener/fd/fd.go deleted file mode 100644 index 7972467..0000000 --- a/pkg/listener/fd/fd.go +++ /dev/null @@ -1,32 +0,0 @@ -package fd - -import ( - "fmt" - "github.com/fatih/color" - "github.com/ssst0n3/fd-listener/pkg" -) - -type Stat struct { - FdPath string - RealPath string - SocketPath string - Leak bool - Flags int64 -} - -func (s Stat) String() (content string) { - leaked := "" - if s.Leak { - leaked = "leaked!" - } - flags := pkg.ParseFlags(s.Flags) - var socketPath string - if s.SocketPath != "" { - socketPath = " -> " + s.SocketPath - } - content = fmt.Sprintf("%s -> %s%s\t; %s\t%s", s.FdPath, s.RealPath, socketPath, leaked, flags) - if s.Leak { - content = color.RedString(content) - } - return -} diff --git a/pkg/listener/fd/leak.go b/pkg/listener/fd/leak.go deleted file mode 100644 index 0f65f0d..0000000 --- a/pkg/listener/fd/leak.go +++ /dev/null @@ -1,49 +0,0 @@ -package fd - -import ( - "fmt" - "os" - "strings" - "syscall" -) - -func Leak(fdPath, realPath string) (leak bool, err error) { - fdFI, err := os.Stat(fdPath) - if err != nil { - return - } - fdStat, _ := fdFI.Sys().(*syscall.Stat_t) - realFI, err := os.Stat(realPath) - if err != nil { - if os.IsNotExist(err) { - // stat anon_inode:[eventpoll]: no such file or directory - err = nil - } - return - } - realStat, _ := realFI.Sys().(*syscall.Stat_t) - if fdStat.Ino == realStat.Ino { - leak = true - } - return -} - -func Socket(pid int, realPath string) (socketPath string, err error) { - if !strings.Contains(realPath, "socket:[") { - return - } - id := strings.TrimSuffix(strings.TrimPrefix(realPath, "socket:["), "]") - content, err := os.ReadFile(fmt.Sprintf("/proc/%d/net/unix", pid)) - if err != nil { - return - } - socket := strings.Split(string(content), "\n") - for _, line := range socket { - if strings.Contains(line, id) { - data := strings.Split(line, " ") - socketPath = data[len(data)-1] - return - } - } - return -} diff --git a/pkg/listener/fd/listener.go b/pkg/listener/fd/listener.go index 4f7b482..7ebe186 100644 --- a/pkg/listener/fd/listener.go +++ b/pkg/listener/fd/listener.go @@ -25,7 +25,7 @@ func (l *Listener) Handle() { case event.ProcessExit: worker := l.workers[e.Pid] if worker != nil { - worker.Stop <- true + worker.Close() delete(l.workers, e.Pid) } default: diff --git a/pkg/listener/fd/stat/stat.go b/pkg/listener/fd/stat/stat.go index df97575..696108e 100644 --- a/pkg/listener/fd/stat/stat.go +++ b/pkg/listener/fd/stat/stat.go @@ -13,15 +13,16 @@ type Stat struct { SocketPath string Leak bool Flags int64 + Changed bool } -func New(pid, fd int) (stat Stat, err error) { +func New(pid, fd int) (stat *Stat, err error) { fdPath := fmt.Sprintf("/proc/%d/fd/%d", pid, fd) realPath, _ := os.Readlink(fdPath) if realPath == "" { realPath = "?" } - stat = Stat{ + stat = &Stat{ FdPath: fdPath, RealPath: realPath, } @@ -43,19 +44,32 @@ func New(pid, fd int) (stat Stat, err error) { return } -func (s Stat) String() (content string) { +func (s *Stat) Change(c bool) { + s.Changed = c +} + +func (s *Stat) String() (content string) { + fdPath := s.FdPath + if s.Changed { + fdPath = color.New(color.Underline).Sprint(fdPath) + } leaked := "" if s.Leak { - leaked = "leaked!" + leaked = color.RedString("leaked!") } flags := pkg.ParseFlags(s.Flags) var socketPath string if s.SocketPath != "" { socketPath = " -> " + s.SocketPath } - content = fmt.Sprintf("%s -> %s%s\t; %s\t%s", s.FdPath, s.RealPath, socketPath, leaked, flags) - if s.Leak { - content = color.RedString(content) - } + content = fmt.Sprintf("%s -> %s%s\t; %s\t%s", fdPath, s.RealPath, socketPath, leaked, flags) return } + +func (s *Stat) Equals(s2 *Stat) bool { + return s.FdPath == s2.FdPath && + s.RealPath == s2.RealPath && + s.SocketPath == s2.SocketPath && + s.Leak == s2.Leak && + s.Flags == s2.Flags +} diff --git a/pkg/listener/fd/walk/watcher.go b/pkg/listener/fd/walk/watcher.go index c055c9b..a79191b 100644 --- a/pkg/listener/fd/walk/watcher.go +++ b/pkg/listener/fd/walk/watcher.go @@ -11,46 +11,53 @@ import ( ) type Watcher struct { - store *sync.Map - changed bool - event event.Events - pid int - max int + pid int + store sync.Map + event event.Events + max int + stop chan struct{} } -func New(pid int, store *sync.Map) *Watcher { - return &Watcher{ +func New(pid int) (w *Watcher) { + w = &Watcher{ pid: pid, - store: store, + store: sync.Map{}, + event: event.Events{}, + stop: make(chan struct{}), } + return } -func (w Watcher) Enable() (enabled bool) { +func (w *Watcher) Enable() (enabled bool) { return true } -func (w Watcher) Watch(stop <-chan struct{}, event chan<- event.Events) { +func (w *Watcher) Watch(c chan<- event.Events) { for { select { - case <-stop: + case <-w.stop: return default: - _ = w.do(event) - if len(w.event) > 0 { - event <- w.event + e := event.Events{} + _ = w.do(&e) + if len(e) > 0 { + c <- e } } } } -func (w Watcher) do(e chan<- event.Events) (err error) { - w.changed = false +func (w *Watcher) Close() { + w.stop <- struct{}{} +} + +func (w *Watcher) do(e *event.Events) (err error) { fds, err := fds(w.pid) if err != nil { return } - w.close(fds) - w.openOrChange(fds) + w.closeFd(fds, e) + w.openOrChangeFd(fds, e) return } @@ -76,7 +83,10 @@ func fds(pid int) (fds []int, err error) { } // fds has been sorted -func (w Watcher) close(fds []int) { +func (w *Watcher) closeFd(fds []int, e *event.Events) { + if len(fds) == 0 { + return + } m := max(w.max, fds[len(fds)-1]) var missing []int for i := 0; i < len(fds)-1; i++ { @@ -89,11 +99,15 @@ func (w Watcher) close(fds []int) { } for _, fd := range missing { if _, ok := w.store.LoadAndDelete(fd); ok { - w.changed = true - w.event = append(w.event, event.Event{ + *e = append(*e, event.Event{ Type: event.Close, Pid: w.pid, Fd: fd, + Stat: &stat.Stat{ + FdPath: fmt.Sprintf("/proc/%d/fd/%d", w.pid, fd), + RealPath: "[FD CLOSED]", + Changed: true, + }, }) } } @@ -101,19 +115,19 @@ func (w Watcher) close(fds []int) { return } -func (w Watcher) openOrChange(fds []int) { +func (w *Watcher) openOrChangeFd(fds []int, e *event.Events) { for _, fd := range fds { s, _ := stat.New(w.pid, fd) if old, ok := w.store.LoadOrStore(fd, s); !ok { - w.event = append(w.event, event.Event{ + *e = append(*e, event.Event{ Type: event.Open, Pid: w.pid, Fd: fd, Stat: s, }) } else { - if old != s { - w.event = append(w.event, event.Event{ + if !old.(*stat.Stat).Equals(s) { + *e = append(*e, event.Event{ Type: event.Change, Pid: w.pid, Fd: fd, diff --git a/pkg/listener/fd/watcher.go b/pkg/listener/fd/watcher.go index 4f9f64f..7a1a27c 100644 --- a/pkg/listener/fd/watcher.go +++ b/pkg/listener/fd/watcher.go @@ -1,8 +1,30 @@ package fd -import "github.com/ssst0n3/fd-listener/pkg/listener/fd/event" +import ( + "github.com/fatih/color" + "github.com/ssst0n3/fd-listener/pkg/listener/fd/ebpf" + "github.com/ssst0n3/fd-listener/pkg/listener/fd/event" + "github.com/ssst0n3/fd-listener/pkg/listener/fd/walk" +) type Watcher interface { - Watch(stop <-chan struct{}, event chan<- event.Events) + Watch(e chan<- event.Events) Enable() (enabled bool) + Close() +} + +func NewWatcher(pid int) (watcher Watcher) { + watcher = ebpf.New(pid) + enabled := watcher.Enable() + color.White("[+] ebpf (faster) enabled: %t\n", enabled) + if enabled { + return + } else { + watcher = walk.New(pid) + return + } +} + +func Watch(watcher Watcher, events chan<- event.Events) { + watcher.Watch(events) } diff --git a/pkg/listener/fd/worker.go b/pkg/listener/fd/worker.go index 2ef7309..da9970e 100644 --- a/pkg/listener/fd/worker.go +++ b/pkg/listener/fd/worker.go @@ -2,135 +2,89 @@ package fd import ( "fmt" - "github.com/ssst0n3/fd-listener/pkg" - "os" + "github.com/ssst0n3/fd-listener/pkg/listener/fd/event" + "github.com/ssst0n3/fd-listener/pkg/listener/fd/stat" "sort" - "strconv" "sync" ) type Worker struct { - pid int - Stop chan bool - store sync.Map - max int + pid int + stop chan struct{} + store sync.Map + max int + watcher Watcher + event chan event.Events } func NewWorker(pid int) (w *Worker) { w = &Worker{ - pid: pid, - Stop: make(chan bool), + pid: pid, + stop: make(chan struct{}), + event: make(chan event.Events), } + w.watcher = NewWatcher(pid) go w.Work() return } -func (l *Worker) Work() { +func (w *Worker) Work() { + go w.watcher.Watch(w.event) for { select { - case <-l.Stop: + case <-w.stop: + w.watcher.Close() return - default: - l.do() + case e := <-w.event: + w.do(e) } } } -func (l *Worker) stat(fd int) (stat Stat, err error) { - fdPath := fmt.Sprintf("/proc/%d/fd/%d", l.pid, fd) - realPath, _ := os.Readlink(fdPath) - if realPath == "" { - realPath = "?" - } - stat = Stat{ - FdPath: fdPath, - RealPath: realPath, - } - socketPath, err := Socket(l.pid, realPath) - if err != nil { - return - } - stat.SocketPath = socketPath - leak, err := Leak(fdPath, realPath) - if err != nil { - return - } - stat.Leak = leak - flags, err := pkg.ReadFlags(fmt.Sprintf("/proc/%d/fdinfo/%d", l.pid, fd)) - if err != nil { - return - } - stat.Flags = flags - return +func (w *Worker) Close() { + w.stop <- struct{}{} } -func (l *Worker) do() { - _, err := os.Lstat(fmt.Sprintf("/proc/%d/", l.pid)) - if os.IsNotExist(err) { +func (w *Worker) do(events event.Events) { + if len(events) == 0 { return } - entries, err := os.ReadDir(fmt.Sprintf("/proc/%d/fd", l.pid)) - if err != nil { - fmt.Printf("open /proc/%d/fd failed\n", l.pid) - return - } - var fds []int - for _, fd := range entries { - fd, err := strconv.Atoi(fd.Name()) - if err != nil { - continue - } - fds = append(fds, fd) - } - sort.Ints(fds) - - var changed bool - - // clear empty fd - if len(fds) > 0 { - for i := fds[len(fds)-1] + 1; i < l.max; i++ { - if _, ok := l.store.LoadAndDelete(i); ok { - changed = true - } + var changed []*stat.Stat + var closed []int + for _, e := range events { + fmt.Sprintf("%+v\n", e) + switch e.Type { + case event.Open, event.Change: + e.Stat.Change(true) + changed = append(changed, e.Stat) + w.store.Store(e.Fd, e.Stat) + case event.Close: + closed = append(closed, e.Fd) + w.store.Store(e.Fd, e.Stat) + default: + panic("unhandled default case") } - l.max = fds[len(fds)-1] } - - var last int - for _, fd := range fds { - for i := last + 1; i < fd; i++ { - if _, ok := l.store.LoadAndDelete(i); ok { - changed = true - } - } - last = fd - stat, _ := l.stat(fd) - if old, ok := l.store.Load(fd); !ok { - l.store.Store(fd, stat) - changed = true - } else { - if old != stat { - l.store.Store(fd, stat) - changed = true - } - } + w.print() + for _, s := range changed { + s.Change(false) } - if changed { - l.print() + for _, fd := range closed { + w.store.Delete(fd) } } -func (l *Worker) print() { +func (w *Worker) print() { var keys []int - l.store.Range(func(key any, value any) bool { + w.store.Range(func(key any, value any) bool { fd := key.(int) keys = append(keys, fd) return true }) sort.Ints(keys) for _, fd := range keys { - stat, _ := l.store.Load(fd) - fmt.Println(stat) + s, _ := w.store.Load(fd) + fmt.Println(s) } fmt.Println("----------------") }