From 2f1ea5cd0e7cd1c6ee4c17468570dbbc02952504 Mon Sep 17 00:00:00 2001 From: Maximilian Lindner <46794237+SR4ven@users.noreply.github.com> Date: Sun, 5 Nov 2023 18:29:06 +0100 Subject: [PATCH] Fix invalid reply of MiniTestServer (#693) --- unit_tests/test_socket_connection.py | 74 ++++++++++++++++------------ 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/unit_tests/test_socket_connection.py b/unit_tests/test_socket_connection.py index 337fa30e..c8f9fdb7 100644 --- a/unit_tests/test_socket_connection.py +++ b/unit_tests/test_socket_connection.py @@ -13,7 +13,14 @@ import pytest from boofuzz import helpers -from boofuzz.connections import ip_constants, SocketConnection +from boofuzz.connections import ( + ip_constants, + RawL2SocketConnection, + RawL3SocketConnection, + SocketConnection, + TCPSocketConnection, + UDPSocketConnection, +) from boofuzz.connections.raw_l3_socket_connection import ETH_P_ALL, ETH_P_IP THREAD_WAIT_TIMEOUT = 10 # Time to wait for a thread before considering it failed. @@ -22,6 +29,7 @@ IP_HEADER_LEN = 20 ETHER_TYPE_IPV4 = struct.pack(">H", ETH_P_IP) # Ethernet frame EtherType for IPv4 +ETHER_IPV4_HEADER = b"\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00" TEST_ERR_NO_NON_LOOPBACK_IPV4 = "No local non-loopback IPv4 address found." @@ -277,7 +285,7 @@ def test_tcp_client(self): t.daemon = True t.start() - uut = SocketConnection(host=socket.gethostname(), port=server.active_port, proto="tcp") + uut = TCPSocketConnection(host=socket.gethostname(), port=server.active_port) uut.logger = logging.getLogger("SulleyUTLogger") # When @@ -312,7 +320,7 @@ def test_tcp_client_timeout(self): t.daemon = True t.start() - uut = SocketConnection(host=socket.gethostname(), port=server.active_port, proto="tcp") + uut = TCPSocketConnection(host=socket.gethostname(), port=server.active_port) uut.logger = logging.getLogger("SulleyUTLogger") # When @@ -350,9 +358,7 @@ def test_udp_client(self): t.daemon = True t.start() - uut = SocketConnection( - host=socket.gethostname(), port=server.active_port, proto="udp", bind=(socket.gethostname(), 0) - ) + uut = UDPSocketConnection(host=socket.gethostname(), port=server.active_port, bind=(socket.gethostname(), 0)) uut.logger = logging.getLogger("SulleyUTLogger") # When @@ -399,12 +405,11 @@ def test_udp_broadcast_client(self): t.daemon = True t.start() - uut = SocketConnection( + uut = UDPSocketConnection( host=broadcast_addr, port=server.active_port, - proto="udp", bind=("", server.active_port + 1), - udp_broadcast=True, + broadcast=True, ) uut.logger = logging.getLogger("BoofuzzUTLogger") @@ -446,10 +451,11 @@ def test_raw_l2(self): # Given server = MiniTestServer(proto="raw", host="lo") - server.data_to_send = b"GKC" + server.data_to_send = ETHER_IPV4_HEADER + b"GKC" server.bind() - uut = SocketConnection(host="lo", proto="raw-l2", recv_timeout=0.1) + # uut = SocketConnection(host="lo", proto="raw-l2", recv_timeout=0.1) + uut = RawL2SocketConnection(interface="lo", ethernet_proto=0x0003, send_timeout=0.1, recv_timeout=0.1) uut.logger = logging.getLogger("SulleyUTLogger") # Assemble packet... @@ -471,6 +477,7 @@ def test_raw_l2(self): # When uut.open() send_result = uut.send(data=raw_packet) + _ = uut.recv(10000) # Discard the data we just sent ourself received = uut.recv(10000) uut.close() @@ -481,7 +488,7 @@ def test_raw_l2(self): # Then self.assertEqual(send_result, len(expected_server_receive)) self.assertEqual(raw_packet, server.received) - self.assertEqual(received, b"") + self.assertEqual(received, server.data_to_send) @pytest.mark.skipif(sys.platform in ["win32", "darwin"], reason="Raw sockets not supported on Windows/Mac OS.") def test_raw_l2_max_size(self): @@ -499,10 +506,10 @@ def test_raw_l2_max_size(self): # Given server = MiniTestServer(proto="raw", host="lo") - server.data_to_send = b"GKC" + server.data_to_send = ETHER_IPV4_HEADER + b"GKC" server.bind() - uut = SocketConnection(host="lo", proto="raw-l2", recv_timeout=0.1) + uut = RawL2SocketConnection(interface="lo", ethernet_proto=0x0003, send_timeout=0.1, recv_timeout=0.1) uut.logger = logging.getLogger("SulleyUTLogger") data_to_send = b"1" * uut.max_send_size @@ -517,6 +524,7 @@ def test_raw_l2_max_size(self): # When uut.open() send_result = uut.send(data=raw_packet) + _ = uut.recv(10000) # Discard the data we just sent ourself received = uut.recv(10000) uut.close() @@ -527,7 +535,7 @@ def test_raw_l2_max_size(self): # Then self.assertEqual(send_result, uut.max_send_size) self.assertEqual(expected_server_receive, server.received) - self.assertEqual(received, b"") + self.assertEqual(received, server.data_to_send) @pytest.mark.skipif(sys.platform in ["win32", "darwin"], reason="Raw sockets not supported on Windows/Mac OS.") def test_raw_l2_oversized(self): @@ -545,10 +553,10 @@ def test_raw_l2_oversized(self): # Given server = MiniTestServer(proto="raw", host="lo") - server.data_to_send = b"GKC" + server.data_to_send = ETHER_IPV4_HEADER + b"GKC" server.bind() - uut = SocketConnection(host="lo", proto="raw-l2", recv_timeout=0.1) + uut = RawL2SocketConnection(interface="lo", ethernet_proto=0x0003, send_timeout=0.1, recv_timeout=0.1) uut.logger = logging.getLogger("SulleyUTLogger") data_to_send = b"F" * (uut.max_send_size + 1) @@ -563,6 +571,7 @@ def test_raw_l2_oversized(self): # When uut.open() send_result = uut.send(data=raw_packet) + _ = uut.recv(10000) # Discard the data we just sent ourself received = uut.recv(10000) uut.close() @@ -573,7 +582,7 @@ def test_raw_l2_oversized(self): # Then self.assertEqual(send_result, uut.max_send_size) self.assertEqual(expected_server_receive, server.received) - self.assertEqual(received, b"") + self.assertEqual(received, server.data_to_send) @pytest.mark.skipif(sys.platform in ["win32", "darwin"], reason="Raw sockets not supported on Windows/Mac OS.") def test_raw_l3(self): @@ -595,10 +604,10 @@ def test_raw_l3(self): # Given server = MiniTestServer(proto="raw", host="lo") - server.data_to_send = b"GKC" + server.data_to_send = ETHER_IPV4_HEADER + b"GKC" server.bind() - uut = SocketConnection(host="lo", proto="raw-l3") + uut = RawL3SocketConnection(interface="lo") uut.logger = logging.getLogger("SulleyUTLogger") # Assemble packet... @@ -607,7 +616,7 @@ def test_raw_l3(self): src_ip=b"\x7F\x00\x00\x01", dst_ip=b"\x7F\x00\x00\x01", ) - expected_server_receive = b"\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00" + raw_packet + expected_server_receive = ETHER_IPV4_HEADER + raw_packet t = threading.Thread(target=functools.partial(server.receive_until, expected_server_receive)) t.daemon = True @@ -616,6 +625,7 @@ def test_raw_l3(self): # When uut.open() send_result = uut.send(data=raw_packet) + _ = uut.recv(10000) # Discard the data we just sent ourself received = uut.recv(10000) uut.close() @@ -626,7 +636,7 @@ def test_raw_l3(self): # Then self.assertEqual(send_result, len(raw_packet)) self.assertEqual(expected_server_receive, server.received) - self.assertEqual(received, raw_packet) + self.assertEqual(received, b"GKC") @pytest.mark.skipif(sys.platform in ["win32", "darwin"], reason="Raw sockets not supported on Windows/Mac OS.") def test_raw_l3_max_size(self): @@ -644,16 +654,16 @@ def test_raw_l3_max_size(self): # Given server = MiniTestServer(proto="raw", host="lo") - server.data_to_send = b"GKC" + server.data_to_send = ETHER_IPV4_HEADER + b"GKC" server.bind() - uut = SocketConnection(host="lo", proto="raw-l3") + uut = RawL3SocketConnection(interface="lo") uut.logger = logging.getLogger("SulleyUTLogger") data_to_send = b"0" * uut.packet_size # Assemble packet... raw_packet = data_to_send - expected_server_receive = b"\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00" + raw_packet + expected_server_receive = ETHER_IPV4_HEADER + raw_packet t = threading.Thread(target=functools.partial(server.receive_until, expected_server_receive)) t.daemon = True @@ -662,6 +672,7 @@ def test_raw_l3_max_size(self): # When uut.open() send_result = uut.send(data=raw_packet) + _ = uut.recv(10000) # Discard the data we just sent ourself received = uut.recv(10000) uut.close() @@ -672,7 +683,7 @@ def test_raw_l3_max_size(self): # Then self.assertEqual(send_result, uut.packet_size) self.assertEqual(expected_server_receive, server.received) - self.assertEqual(received, data_to_send) + self.assertEqual(received, b"GKC") @pytest.mark.skipif(sys.platform in ["win32", "darwin"], reason="Raw sockets not supported on Windows/Mac OS.") def test_raw_l3_oversized(self): @@ -690,18 +701,16 @@ def test_raw_l3_oversized(self): # Given server = MiniTestServer(proto="raw", host="lo") - server.data_to_send = b"GKC" + server.data_to_send = ETHER_IPV4_HEADER + b"GKC" server.bind() - uut = SocketConnection(host="lo", proto="raw-l3") + uut = RawL3SocketConnection(interface="lo") uut.logger = logging.getLogger("SulleyUTLogger") data_to_send = b"D" * (uut.packet_size + 1) # Assemble packet... raw_packet = data_to_send - expected_server_receive = ( - b"\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00" + raw_packet[: uut.packet_size] - ) + expected_server_receive = ETHER_IPV4_HEADER + raw_packet[: uut.packet_size] t = threading.Thread(target=functools.partial(server.receive_until, expected_server_receive)) t.daemon = True @@ -710,6 +719,7 @@ def test_raw_l3_oversized(self): # When uut.open() send_result = uut.send(data=raw_packet) + _ = uut.recv(10000) # Discard the data we just sent ourself received = uut.recv(10000) uut.close() @@ -720,7 +730,7 @@ def test_raw_l3_oversized(self): # Then self.assertEqual(send_result, uut.packet_size) self.assertEqual(expected_server_receive, server.received) - self.assertEqual(received, data_to_send[:-1]) + self.assertEqual(received, b"GKC") def test_required_args_port(self): """