-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathfiltering_test.py
184 lines (157 loc) · 5.76 KB
/
filtering_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import ctypes
from unittest.mock import MagicMock
import pytest
from pidtree_bcc.filtering import CFilterKey
from pidtree_bcc.filtering import CFilterValue
from pidtree_bcc.filtering import CPortRange
from pidtree_bcc.filtering import load_filters_into_map
from pidtree_bcc.filtering import load_intset_into_map
from pidtree_bcc.filtering import load_port_filters_into_map
from pidtree_bcc.filtering import NetFilter
from pidtree_bcc.filtering import port_range_mapper
from pidtree_bcc.filtering import PortFilterMode
from pidtree_bcc.utils import ip_to_int
@pytest.fixture
def net_filtering():
return NetFilter(
[
{
'network': '127.0.0.1',
'network_mask': '255.0.0.0',
},
{
'network': '10.0.0.0',
'network_mask': '255.0.0.0',
'except_ports': [123],
},
{
'network': '192.168.0.0',
'network_mask': '255.255.0.0',
'include_ports': [123],
},
],
)
def test_filter_ip_int(net_filtering):
assert net_filtering.is_filtered(ip_to_int('127.1.33.7'), 80)
assert not net_filtering.is_filtered(ip_to_int('1.2.3.4'), 80)
def test_filter_ip_str(net_filtering):
assert net_filtering.is_filtered('127.1.33.7', 80)
assert not net_filtering.is_filtered('1.2.3.4', 80)
def test_filter_ip_except_port(net_filtering):
assert net_filtering.is_filtered('10.1.2.3', 80)
assert not net_filtering.is_filtered('10.1.2.3', 123)
def test_filter_ip_include_port(net_filtering):
assert net_filtering.is_filtered('192.168.0.1', 123)
assert not net_filtering.is_filtered('192.168.0.1', 80)
def test_load_filters_into_map():
mock_filters = [
{
'network': '127.0.0.0',
'network_mask': '255.0.0.0',
},
{
'network': '10.0.0.0',
'network_mask': '255.0.0.0',
'except_ports': [123, 456],
},
{
'network': '192.168.0.0',
'network_mask': '255.255.0.0',
'include_ports': ['100-200'],
},
]
res_map = {}
load_filters_into_map(mock_filters, res_map)
assert res_map == {
CFilterKey(prefixlen=8, data=127): CFilterValue(mode=0, range_size=0),
CFilterKey(prefixlen=8, data=10): CFilterValue(
mode=1,
range_size=2,
ranges=CFilterValue.range_array_t(CPortRange(123, 123), CPortRange(456, 456)),
),
CFilterKey(prefixlen=16, data=43200): CFilterValue(
mode=2, range_size=1, ranges=CFilterValue.range_array_t(CPortRange(100, 200)),
),
}
def test_load_filters_into_map_diff():
mock_filters = [
{
'network': '127.0.0.0',
'network_mask': '255.0.0.0',
},
{
'network': '10.0.0.0',
'network_mask': '255.0.0.0',
'except_ports': [123, 456],
},
]
res_map = {CFilterKey(prefixlen=8, data=127): 'foo', CFilterKey(prefixlen=16, data=43200): 'bar'}
load_filters_into_map(mock_filters, res_map, True)
assert res_map == {
CFilterKey(prefixlen=8, data=127): CFilterValue(mode=0, range_size=0),
CFilterKey(prefixlen=8, data=10): CFilterValue(
mode=1,
range_size=2,
ranges=CFilterValue.range_array_t(CPortRange(123, 123), CPortRange(456, 456)),
),
}
@pytest.mark.parametrize(
'filter_input,mode,expected',
[
((22, 80, 443), PortFilterMode.include, {0: 2, 22: 1, 80: 1, 443: 1}),
(('10-20',), PortFilterMode.exclude, {0: 1, **{i: 1 for i in range(10, 21)}}),
((1, '2-9', 10), PortFilterMode.exclude, {i: 1 for i in range(11)}),
],
)
def test_load_port_filters_into_map(filter_input, mode, expected):
res_map = MagicMock()
load_port_filters_into_map(filter_input, mode, res_map)
assert expected == {
call_args[0][0].value: call_args[0][1].value
for call_args in res_map.__setitem__.call_args_list
}
def test_load_port_filters_into_map_diff():
res_map = MagicMock()
res_map.items.return_value = [(ctypes.c_int(i), ctypes.c_uint8(i % 2)) for i in range(16)]
load_port_filters_into_map(range(1, 6), PortFilterMode.include, res_map, True)
assert {
0: 2,
**{i: 1 for i in range(1, 6)},
**{i: 0 for i in range(6, 16) if i % 2 > 0},
} == {
call_args[0][0].value: call_args[0][1].value
for call_args in res_map.__setitem__.call_args_list
}
def test_port_range_mapper():
assert list(port_range_mapper('22-80')) == list(range(22, 81))
assert list(port_range_mapper('0-10')) == list(range(1, 11))
assert list(port_range_mapper('100-100000000')) == list(range(100, 65535))
def test_lpm_trie_key():
assert (
CFilterKey.from_network_definition('255.255.0.0', '192.168.0.0')
== CFilterKey.from_network_definition('255.255.0.0', '192.168.2.3')
)
@pytest.mark.parametrize(
'intset,initial,expected,do_diff',
(
({1, 2, 3}, {}, {1: 1, 2: 1, 3: 1}, False),
({2, 3}, {1: 1}, {1: 1, 2: 1, 3: 1}, False),
({2, 3}, {1: 1}, {2: 1, 3: 1}, True),
),
)
def test_load_intset_into_map(intset, initial, expected, do_diff):
mapping = MagicMock()
mapping.items.return_value = [(ctypes.c_int(k), ctypes.c_uint8(v)) for k, v in initial.items()]
load_intset_into_map(intset, mapping, do_diff)
assert expected == {
**({} if do_diff else initial),
**{
call_args[0][0].value: call_args[0][1].value
for call_args in mapping.__setitem__.call_args_list
},
}
if do_diff:
assert (set(initial) - intset) == {
call_args[0][0].value
for call_args in mapping.__delitem__.call_args_list
}