From c414e8f884d09c4d8f6895a313c5428936b36a2c Mon Sep 17 00:00:00 2001 From: Derek Higgins Date: Fri, 8 Oct 2021 11:12:27 +0100 Subject: [PATCH] Add ipv6 support to should_bypass_proxies Add support to should_bypass_proxies to support IPv6 ipaddresses and CIDRs in no_proxy. Includes adding IPv6 support to various other helper functions. --- requests/utils.py | 79 +++++++++++++++++++++++++++++++++++++-------- tests/test_utils.py | 61 +++++++++++++++++++++++++++++++--- 2 files changed, 121 insertions(+), 19 deletions(-) diff --git a/requests/utils.py b/requests/utils.py index 97f895ebb3..168d78fdd9 100644 --- a/requests/utils.py +++ b/requests/utils.py @@ -648,18 +648,42 @@ def requote_uri(uri): return quote(uri, safe=safe_without_percent) +def _get_mask_bits(mask, totalbits=32): + """Converts a mask from /xx format to a int + to be used as a mask for IP's in int format + + Example: if mask is 24 function returns 0xFFFFFF00 + if mask is 24 and totalbits=128 function + returns 0xFFFFFF00000000000000000000000000 + + :rtype: int + """ + bits = ((1 << mask) - 1) << (totalbits - mask) + return bits + + def address_in_network(ip, net): """This function allows you to check if an IP belongs to a network subnet Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24 + returns True if ip = 1:2:3:4::1 and net = 1:2:3:4::/64 :rtype: bool """ - ipaddr = struct.unpack('=L', socket.inet_aton(ip))[0] netaddr, bits = net.split('/') - netmask = struct.unpack('=L', socket.inet_aton(dotted_netmask(int(bits))))[0] - network = struct.unpack('=L', socket.inet_aton(netaddr))[0] & netmask + if is_ipv4_address(ip) and is_ipv4_address(netaddr): + ipaddr = struct.unpack('>L', socket.inet_aton(ip))[0] + netmask = _get_mask_bits(int(bits)) + network = struct.unpack('>L', socket.inet_aton(netaddr))[0] + elif is_ipv6_address(ip) and is_ipv6_address(netaddr): + ipaddr_msb, ipaddr_lsb = struct.unpack('>QQ', socket.inet_pton(socket.AF_INET6, ip)) + ipaddr = (ipaddr_msb << 64) ^ ipaddr_lsb + netmask = _get_mask_bits(int(bits), 128) + network_msb, network_lsb = struct.unpack('>QQ', socket.inet_pton(socket.AF_INET6, netaddr)) + network = (network_msb << 64) ^ network_lsb + else: + return False return (ipaddr & netmask) == (network & netmask) @@ -679,12 +703,37 @@ def is_ipv4_address(string_ip): :rtype: bool """ try: - socket.inet_aton(string_ip) + socket.inet_pton(socket.AF_INET, string_ip) + except socket.error: + return False + return True + + +def is_ipv6_address(string_ip): + """ + :rtype: bool + """ + try: + socket.inet_pton(socket.AF_INET6, string_ip) except socket.error: return False return True +def compare_ips(a, b): + """ + Compare 2 IP's, uses socket.inet_pton to normalize IPv6 IPs + + :rtype: bool + """ + if a == b: + return True + try: + return socket.inet_pton(socket.AF_INET6, a) == socket.inet_pton(socket.AF_INET6, b) + except socket.error: + return False + + def is_valid_cidr(string_network): """ Very simple check of the cidr format in no_proxy variable. @@ -692,18 +741,20 @@ def is_valid_cidr(string_network): :rtype: bool """ if string_network.count('/') == 1: + address, mask = string_network.split('/') try: - mask = int(string_network.split('/')[1]) + mask = int(mask) except ValueError: return False - if mask < 1 or mask > 32: - return False - - try: - socket.inet_aton(string_network.split('/')[0]) - except socket.error: - return False + if is_ipv4_address(address): + if mask < 1 or mask > 32: + return False + elif is_ipv6_address(address): + if mask < 1 or mask > 128: + return False + else: + return False else: return False return True @@ -759,12 +810,12 @@ def should_bypass_proxies(url, no_proxy): host for host in no_proxy.replace(' ', '').split(',') if host ) - if is_ipv4_address(parsed.hostname): + if is_ipv4_address(parsed.hostname) or is_ipv6_address(parsed.hostname): for proxy_ip in no_proxy: if is_valid_cidr(proxy_ip): if address_in_network(parsed.hostname, proxy_ip): return True - elif parsed.hostname == proxy_ip: + elif compare_ips(parsed.hostname, proxy_ip): # If no_proxy ip was defined in plain IP notation instead of cidr notation & # matches the IP of the index return True diff --git a/tests/test_utils.py b/tests/test_utils.py index 98ffb25a6c..792f76d3b2 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -21,7 +21,8 @@ requote_uri, select_proxy, should_bypass_proxies, super_len, to_key_val_list, to_native_string, unquote_header_value, unquote_unreserved, - urldefragauth, add_dict_to_cookiejar, set_environ) + urldefragauth, add_dict_to_cookiejar, set_environ, _get_mask_bits, + compare_ips) from requests._internal_utils import unicode_is_ascii from .compat import StringIO, cStringIO @@ -216,8 +217,13 @@ def test_invalid(self, value): class TestIsValidCIDR: - def test_valid(self): - assert is_valid_cidr('192.168.1.0/24') + @pytest.mark.parametrize( + 'value', ( + '192.168.1.0/24', + '1:2:3:4::/64', + )) + def test_valid(self, value): + assert is_valid_cidr(value) @pytest.mark.parametrize( 'value', ( @@ -226,6 +232,11 @@ def test_valid(self): '192.168.1.0/128', '192.168.1.0/-1', '192.168.1.999/24', + '1:2:3:4::1', + '1:2:3:4::/a', + '1:2:3:4::0/321', + '1:2:3:4::/-1', + '1:2:3:4::12211/64', )) def test_invalid(self, value): assert not is_valid_cidr(value) @@ -239,6 +250,12 @@ def test_valid(self): def test_invalid(self): assert not address_in_network('172.16.0.1', '192.168.1.0/24') + def test_valid_v6(self): + assert address_in_network('1:2:3:4::1111', '1:2:3:4::/64') + + def test_invalid_v6(self): + assert not address_in_network('1:2:3:4:1111', '1:2:3:4::/124') + class TestGuessFilename: @@ -628,13 +645,18 @@ def test_urldefragauth(url, expected): ('http://172.16.1.12:5000/', False), ('http://google.com:5000/v1.0/', False), ('file:///some/path/on/disk', True), + ('http://[1:2:3:4:5:6:7:8]:5000/', True), + ('http://[1:2:3:4::1]/', True), + ('http://[1:2:3:9::1]/', True), + ('http://[1:2:3:9:0:0:0:1]/', True), + ('http://[1:2:3:9::2]/', False), )) def test_should_bypass_proxies(url, expected, monkeypatch): """Tests for function should_bypass_proxies to check if proxy can be bypassed or not """ - monkeypatch.setenv('no_proxy', '192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1, google.com:6000') - monkeypatch.setenv('NO_PROXY', '192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1, google.com:6000') + monkeypatch.setenv('no_proxy', '192.168.0.0/24,127.0.0.1,localhost.localdomain,1:2:3:4::/64,1:2:3:9::1,172.16.1.1, google.com:6000') + monkeypatch.setenv('NO_PROXY', '192.168.0.0/24,127.0.0.1,localhost.localdomain,1:2:3:4::/64,1:2:3:9::1,172.16.1.1, google.com:6000') assert should_bypass_proxies(url, no_proxy=None) == expected @@ -785,3 +807,32 @@ def test_set_environ_raises_exception(): raise Exception('Expected exception') assert 'Expected exception' in str(exception.value) + + +@pytest.mark.parametrize( + 'mask, totalbits, maskbits', ( + (24, None, 0xFFFFFF00), + (31, None, 0xFFFFFFFE), + (0, None, 0x0), + (4, 4, 0xF), + (24, 128, 0xFFFFFF00000000000000000000000000), + )) +def test__get_mask_bits(mask, totalbits, maskbits): + args = {"mask": mask} + if totalbits: + args["totalbits"] = totalbits + assert _get_mask_bits(**args) == maskbits + + +@pytest.mark.parametrize( + 'a, b, expected', ( + ('1.2.3.4', '1.2.3.4', True), + ('1.2.3.4', '2.2.3.4', False), + ('1::4', '1.2.3.4', False), + ('1::4', '1::4', True), + ('1::4', '1:0:0:0:0:0:0:4', True), + ('1::4', '1:0:0:0:0:0::4', True), + ('1::4', '1:0:0:0:0:0:1:4', False), + )) +def test_compare_ips(a, b, expected): + assert compare_ips(a, b) == expected