Skip to content

Commit

Permalink
snapshot sidecar for net_listen probe
Browse files Browse the repository at this point in the history
piax93 committed Oct 27, 2020
1 parent 31d4281 commit c6d3740
Showing 5 changed files with 149 additions and 6 deletions.
1 change: 1 addition & 0 deletions example_config.yml
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ tcp_connect:
- '/etc/hosts'
attribute_key: "source_host"
net_listen:
snapshot_periodicity: 43200
protocols: [tcp]
excludeports:
- 22222
23 changes: 17 additions & 6 deletions pidtree_bcc/probes/__init__.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
import re
from datetime import datetime
from multiprocessing import SimpleQueue
from threading import Thread
from typing import Any

from bcc import BPF
@@ -21,6 +22,11 @@ class BPFProbe:
In either case the program text will be processed in Jinja templating.
"""

# List of (function, args) tuples to run in parallel with the probes as "sidecars"
# No health monitoring is performed on these after launch so they are expect to be
# stable or self-healing.
SIDECARS = []

def __init__(self, output_queue: SimpleQueue, probe_config: dict = {}):
""" Constructor
@@ -39,23 +45,26 @@ class variable defining a list of config fields.
if not hasattr(self, 'BPF_TEXT'):
with open(re.sub(r'\.py$', '.j2', module_src)) as f:
self.BPF_TEXT = f.read()
template_config = (
{**self.CONFIG_DEFAULTS, **probe_config}
if hasattr(self, 'CONFIG_DEFAULTS')
else probe_config.copy()
)
if hasattr(self, 'TEMPLATE_VARS'):
template_config = {k: probe_config[k] for k in self.TEMPLATE_VARS}
template_config = {k: template_config[k] for k in self.TEMPLATE_VARS}
else:
template_config = probe_config.copy()
template_config.pop('plugins', None)
if hasattr(self, 'CONFIG_DEFAULTS'):
template_config = {**self.CONFIG_DEFAULTS, **template_config}
self.expanded_bpf_text = Template(self.BPF_TEXT).render(**template_config)

def _process_events(self, cpu: Any, data: Any, size: Any):
def _process_events(self, cpu: Any, data: Any, size: Any, from_bpf: bool = True):
""" BPF event callback
:param Any cpu: unused arg required for callback
:param Any data: BPF raw event
:param Any size: unused arg required for callback
:param bool from_bpf: event generated by BPF code
"""
event = self.bpf['events'].event(data)
event = self.bpf['events'].event(data) if from_bpf else data
event = self.enrich_event(event)
event['timestamp'] = datetime.utcnow().isoformat() + 'Z'
event['probe'] = self.probe_name
@@ -65,6 +74,8 @@ def _process_events(self, cpu: Any, data: Any, size: Any):

def start_polling(self):
""" Start infinite loop polling BPF events """
for func, args in self.SIDECARS:
Thread(target=func, args=args, daemon=True).start()
self.bpf = BPF(text=self.expanded_bpf_text)
self.bpf['events'].open_perf_buffer(self._process_events)
while True:
67 changes: 67 additions & 0 deletions pidtree_bcc/probes/net_listen.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
import inspect
import socket
import time
import traceback
from collections import namedtuple
from itertools import chain
from multiprocessing import SimpleQueue
from typing import Any

import psutil

from pidtree_bcc.probes import BPFProbe
from pidtree_bcc.utils import crawl_process_tree
from pidtree_bcc.utils import int_to_ip
from pidtree_bcc.utils import ip_to_int
from pidtree_bcc.utils import never_crash


NetListenWrapper = namedtuple('NetListenWrapper', ('pid', 'laddr', 'port', 'protocol'))


class NetListenProbe(BPFProbe):
@@ -21,8 +31,32 @@ class NetListenProbe(BPFProbe):
'protocols': ['tcp'],
'excludeaddress': [],
'excludeports': [],
'snapshot_periodicity': False,
}

def __init__(self, output_queue: SimpleQueue, config: dict = {}):
super().__init__(output_queue, config)

def port_range_mapper(port_range: str):
from_p, to_p = map(int, port_range.split('-'))
return max(0, from_p), min(65535, to_p + 1)

config = {**self.CONFIG_DEFAULTS, **config}
self.log_tcp = 'tcp' in config['protocols']
self.log_udp = 'udp' in config['protocols']
self.excludeaddrs = set(config['excludeaddress'])
self.excludeports = set(
chain.from_iterable(
range(*port_range_mapper(p)) if '-' in p else [int(p)]
for p in map(str, config['excludeports'])
),
)
if config['snapshot_periodicity']:
self.SIDECARS.append((
self._snapshot_worker,
(config['snapshot_periodicity'],),
))

def enrich_event(self, event: Any) -> dict:
""" Parses network "listen event" and adds process tree data
@@ -43,3 +77,36 @@ def enrich_event(self, event: Any) -> dict:
'protocol': self.PROTO_MAP.get(event.protocol, 'unknown'),
'error': error,
}

@never_crash
def _snapshot_worker(self, periodicity: int):
""" Handler function for snapshot thread.
:param int periodicity: how many seconds to wait between snapshots
"""
time.sleep(300) # sleep 5 minutes to avoid "noisy" restarts
while True:
socket_stats = psutil.net_connections('inet4')
for conn in socket_stats:
if not conn.pid:
# filter out entries without associated PID
continue
if self.log_tcp and conn.status == 'LISTEN':
protocol = socket.IPPROTO_TCP
elif self.log_udp and conn.status == 'NONE' and conn.type == socket.SOCK_DGRAM:
protocol = socket.IPPROTO_UDP
else:
protocol = None
if (
protocol
and conn.laddr.ip not in self.excludeaddrs
and conn.laddr.port not in self.excludeports
):
event = NetListenWrapper(
conn.pid,
ip_to_int(conn.laddr.ip),
conn.laddr.port,
protocol,
)
self._process_events(None, event, None, False)
time.sleep(periodicity)
17 changes: 17 additions & 0 deletions pidtree_bcc/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import functools
import importlib
import inspect
import logging
import socket
import struct
import sys
from typing import Callable
from typing import List
from typing import TextIO
from typing import Type
@@ -79,3 +82,17 @@ def int_to_ip(encoded_ip: int) -> str:
:return: dot-notation IP
"""
return socket.inet_ntoa(struct.pack('<L', encoded_ip))


def never_crash(func: Callable) -> Callable:
""" Decorator for Thread targets which ensures the thread keeps
running by chatching any exception.
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
while True:
try:
return func(*args, **kwargs)
except Exception as e:
logging.error('Error executing {}: {}'.format(func.__name__, e))
return wrapper
47 changes: 47 additions & 0 deletions tests/net_listen_probe_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import socket
from unittest.mock import call
from unittest.mock import MagicMock
from unittest.mock import patch

import pytest

from pidtree_bcc.probes.net_listen import NetListenProbe
from pidtree_bcc.probes.net_listen import NetListenWrapper


@patch('pidtree_bcc.probes.net_listen.crawl_process_tree')
@@ -31,3 +36,45 @@ def test_net_listen_enrich_event(mock_crawl):
'error': '',
}
mock_crawl.assert_called_once_with(123)


@patch('pidtree_bcc.probes.net_listen.time')
@patch('pidtree_bcc.probes.net_listen.psutil')
def test_net_listen_snapshot_worker(mock_psutil, mock_time):
mock_time.sleep.side_effect = [None, Exception('foobar')] # to stop inf loop
mock_psutil.net_connections.return_value = [
MagicMock(
pid=111,
status='LISTEN',
laddr=MagicMock(ip='127.0.0.1', port=1337),
),
MagicMock(
pid=112,
status='LISTEN',
laddr=MagicMock(ip='127.0.0.1', port=80),
),
MagicMock(
pid=113,
status='NONE',
laddr=MagicMock(ip='127.0.0.1', port=7331),
type=socket.SOCK_DGRAM,
),
MagicMock(
pid=None,
),
]
probe = NetListenProbe(None, {'excludeports': ['0-100'], 'protocols': ['udp', 'tcp']})
with patch.object(probe, '_process_events') as mock_process:
# never_crash uses functools.wraps so we can extract the wrapped method
undecorated_method = probe._snapshot_worker.__wrapped__
# assert we catch the inf loop stopping exception
with pytest.raises(Exception, match='foobar'):
# the undecorated method is not bound to the object,
# so we need to pass `probe` as `self`
undecorated_method(probe, 123)
mock_process.assert_has_calls([
call(None, NetListenWrapper(111, 16777343, 1337, 6), None, False),
call(None, NetListenWrapper(113, 16777343, 7331, 17), None, False),
])
mock_psutil.net_connections.assert_called_once_with('inet4')
mock_time.sleep.assert_has_calls([call(300), call(123)])

0 comments on commit c6d3740

Please sign in to comment.