Skip to content

Commit

Permalink
Fix resource leak bug in auditbeat/socket (#41080) (#41137)
Browse files Browse the repository at this point in the history
* fix resource leak bug in auditbeat/socket

* linter..

* linter...

(cherry picked from commit 12e846b)

Co-authored-by: Alex K. <[email protected]>
  • Loading branch information
mergify[bot] and fearful-symmetry authored Oct 14, 2024
1 parent 8048cb4 commit c01af07
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 20 deletions.
24 changes: 6 additions & 18 deletions x-pack/auditbeat/module/system/socket/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,22 +157,6 @@ func (e *tcpConnectResult) Update(s *state) error {
return fmt.Errorf("stored thread event has unexpected type %T", ev)
}

var tcpStates = []string{
"(zero)",
"TCP_ESTABLISHED",
"TCP_SYN_SENT",
"TCP_SYN_RECV",
"TCP_FIN_WAIT1",
"TCP_FIN_WAIT2",
"TCP_TIME_WAIT",
"TCP_CLOSE",
"TCP_CLOSE_WAIT",
"TCP_LAST_ACK",
"TCP_LISTEN",
"TCP_CLOSING",
"TCP_NEW_SYN_RECV",
}

type tcpAcceptResult struct {
Meta tracing.Metadata `kprobe:"metadata"`
Sock uintptr `kprobe:"sock"`
Expand Down Expand Up @@ -894,7 +878,7 @@ func (e *execveCall) getProcess() *process {
var err error
p.path, err = filepath.EvalSymlinks(fmt.Sprintf("/proc/%d/exe", e.Meta.PID))
if err != nil {
if pe, ok := err.(*os.PathError); ok && strings.Contains(pe.Path, "(deleted)") {
if pe, ok := err.(*os.PathError); ok && strings.Contains(pe.Path, "(deleted)") { //nolint:errorlint // we're fetching the string body
// Keep the deleted path from the PathError.
p.path = pe.Path
// Keep the basename in case we can't get the process name.
Expand Down Expand Up @@ -1048,9 +1032,13 @@ func (e *doExit) String() string {

// Update the state with the contents of this event.
func (e *doExit) Update(s *state) (err error) {
// Only report exits of the main thread, a.k.a process exit
// Report exit of the main thread,
// or a TID that was originally reported by doFork.
if e.Meta.PID == e.Meta.TID {
err = s.TerminateProcess(e.Meta.PID)

} else if e.Meta.PID != e.Meta.TID && s.processExists(e.Meta.TID) {
err = s.TerminateProcess(e.Meta.TID)
}
// Cleanup any saved thread state
s.ThreadLeave(e.Meta.TID)
Expand Down
19 changes: 17 additions & 2 deletions x-pack/auditbeat/module/system/socket/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,12 @@ func (dt *dnsTracker) AddTransaction(tr dns.Transaction) {
}
}
var list []dns.Transaction
var ok bool
if prev := dt.transactionByClient.Get(clientAddr); prev != nil {
list = prev.([]dns.Transaction)
list, ok = prev.([]dns.Transaction)
if !ok {
return
}
}
list = append(list, tr)
dt.transactionByClient.Put(clientAddr, list)
Expand All @@ -332,7 +336,11 @@ func (dt *dnsTracker) RegisterEndpoint(addr net.UDPAddr, proc *process) {
key := addr.String()
dt.processByClient.Put(key, proc)
if listIf := dt.transactionByClient.Get(key); listIf != nil {
list := listIf.([]dns.Transaction)
list, ok := listIf.([]dns.Transaction)
if !ok {
return
}

for _, tr := range list {
proc.addTransaction(tr)
}
Expand Down Expand Up @@ -577,6 +585,13 @@ func (s *state) TerminateProcess(pid uint32) error {
return nil
}

func (s *state) processExists(pid uint32) bool {
s.Lock()
defer s.Unlock()
_, ok := s.processes[pid]
return ok
}

func (s *state) getProcess(pid uint32) *process {
if pid == 0 {
return &kernelProcess
Expand Down
73 changes: 73 additions & 0 deletions x-pack/auditbeat/module/system/socket/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"

"github.com/elastic/beats/v7/auditbeat/tracing"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/x-pack/auditbeat/module/system"
"github.com/elastic/beats/v7/x-pack/auditbeat/module/system/socket/dns"
"github.com/elastic/elastic-agent-libs/logp"
)

type logWrapper testing.T
Expand Down Expand Up @@ -88,6 +90,76 @@ func makeTestingState(t *testing.T, inactiveTimeout, socketTimeout, closeTimeout
return ts
}

func TestDNSMemoryUsage(t *testing.T) {
logp.DevelopmentSetup()
rootPID := uint32(1234)
childThread1 := uint32(1235)
childThread2 := uint32(1236)
st := makeTestingState(t, time.Second, time.Second, 0, time.Second)
// construct a fake series of DNS events process events
err := st.OnDNSTransaction(dns.Transaction{
Client: net.UDPAddr{IP: net.ParseIP("192.168.1.2"), Port: 34074},
Server: net.UDPAddr{IP: net.ParseIP("192.168.1.53"), Port: 53},
Domain: "example.com",
Addresses: []net.IP{net.ParseIP("10.10.10.10")},
})
require.NoError(t, err)
err = st.OnDNSTransaction(dns.Transaction{
Client: net.UDPAddr{IP: net.ParseIP("192.168.1.2"), Port: 34074},
Server: net.UDPAddr{IP: net.ParseIP("192.168.1.53"), Port: 53},
Domain: "elastic.co",
Addresses: []net.IP{net.ParseIP("10.10.10.11")},
})
require.NoError(t, err)
events1 := []event{
callExecve(meta(rootPID, rootPID, 1), []string{"/usr/bin/curl"}),
&execveRet{Meta: meta(rootPID, rootPID, 2), Retval: int32(rootPID)},
&udpSendMsgCall{
Meta: meta(rootPID, rootPID, 2),
LAddr: 33663168,
RAddr: 889301184,
LPort: 6789, // ports are in network byte order
RPort: 13568,
Size: 55,
SIPtr: uintptr(1),
SIAF: unix.AF_INET,
},
&udpQueueRcvSkb{
Meta: meta(rootPID, rootPID, 2),
LAddr: 33663168,
LPort: 6789,
Size: 55,
IPHdr: 1,
UDPHdr: 21,
Packet: [256]byte{
// network body, starting with IP header
0x00,
0x40, 0x00, 0x00, 0x20,
0x00, 0x01, 0x00, 0x00,
0xff, 0x11, 0xff, 0xff,
0xc0, 0xa8, 0x1, 0x35,
0xc0, 0xa8, 0x1, 0x2,
0x00, 0x35,
},
}, &forkRet{Meta: meta(rootPID, rootPID, 2), Retval: int(childThread1)},
&forkRet{Meta: meta(rootPID, rootPID, 2), Retval: int(childThread2)},

&doExit{Meta: meta(rootPID, childThread1, 2)},
&doExit{Meta: meta(rootPID, childThread2, 2)},
}
st.feedEvents(events1)

// ensure that we only have one PID in the map,
// and that's the root pid
t.Logf("Got procs: %d", len(st.processes))
require.Equal(t, 1, len(st.processes))
require.NotNil(t, st.processes[rootPID])

// now close the final process, make sure we've cleaned up the final process
st.feedEvents([]event{&doExit{Meta: meta(rootPID, rootPID, 2)}})
require.Equal(t, 0, len(st.processes))
}

func TestTCPConnWithProcess(t *testing.T) {
const (
localIP = "192.168.33.10"
Expand All @@ -97,6 +169,7 @@ func TestTCPConnWithProcess(t *testing.T) {
sock uintptr = 0xff1234
)
st := makeTestingState(t, time.Second, time.Second, 0, time.Second)

lPort, rPort := be16(localPort), be16(remotePort)
lAddr, rAddr := ipv4(localIP), ipv4(remoteIP)
evs := []event{
Expand Down

0 comments on commit c01af07

Please sign in to comment.