Skip to content

Commit

Permalink
Fix invalid reply of MiniTestServer (#693)
Browse files Browse the repository at this point in the history
  • Loading branch information
SR4ven authored Nov 5, 2023
1 parent cffea3d commit 2f1ea5c
Showing 1 changed file with 42 additions and 32 deletions.
74 changes: 42 additions & 32 deletions unit_tests/test_socket_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@
import pytest

from boofuzz import helpers
from boofuzz.connections import ip_constants, SocketConnection
from boofuzz.connections import (
ip_constants,
RawL2SocketConnection,
RawL3SocketConnection,
SocketConnection,
TCPSocketConnection,
UDPSocketConnection,
)
from boofuzz.connections.raw_l3_socket_connection import ETH_P_ALL, ETH_P_IP

THREAD_WAIT_TIMEOUT = 10 # Time to wait for a thread before considering it failed.
Expand All @@ -22,6 +29,7 @@
IP_HEADER_LEN = 20

ETHER_TYPE_IPV4 = struct.pack(">H", ETH_P_IP) # Ethernet frame EtherType for IPv4
ETHER_IPV4_HEADER = b"\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00"

TEST_ERR_NO_NON_LOOPBACK_IPV4 = "No local non-loopback IPv4 address found."

Expand Down Expand Up @@ -277,7 +285,7 @@ def test_tcp_client(self):
t.daemon = True
t.start()

uut = SocketConnection(host=socket.gethostname(), port=server.active_port, proto="tcp")
uut = TCPSocketConnection(host=socket.gethostname(), port=server.active_port)
uut.logger = logging.getLogger("SulleyUTLogger")

# When
Expand Down Expand Up @@ -312,7 +320,7 @@ def test_tcp_client_timeout(self):
t.daemon = True
t.start()

uut = SocketConnection(host=socket.gethostname(), port=server.active_port, proto="tcp")
uut = TCPSocketConnection(host=socket.gethostname(), port=server.active_port)
uut.logger = logging.getLogger("SulleyUTLogger")

# When
Expand Down Expand Up @@ -350,9 +358,7 @@ def test_udp_client(self):
t.daemon = True
t.start()

uut = SocketConnection(
host=socket.gethostname(), port=server.active_port, proto="udp", bind=(socket.gethostname(), 0)
)
uut = UDPSocketConnection(host=socket.gethostname(), port=server.active_port, bind=(socket.gethostname(), 0))
uut.logger = logging.getLogger("SulleyUTLogger")

# When
Expand Down Expand Up @@ -399,12 +405,11 @@ def test_udp_broadcast_client(self):
t.daemon = True
t.start()

uut = SocketConnection(
uut = UDPSocketConnection(
host=broadcast_addr,
port=server.active_port,
proto="udp",
bind=("", server.active_port + 1),
udp_broadcast=True,
broadcast=True,
)
uut.logger = logging.getLogger("BoofuzzUTLogger")

Expand Down Expand Up @@ -446,10 +451,11 @@ def test_raw_l2(self):

# Given
server = MiniTestServer(proto="raw", host="lo")
server.data_to_send = b"GKC"
server.data_to_send = ETHER_IPV4_HEADER + b"GKC"
server.bind()

uut = SocketConnection(host="lo", proto="raw-l2", recv_timeout=0.1)
# uut = SocketConnection(host="lo", proto="raw-l2", recv_timeout=0.1)
uut = RawL2SocketConnection(interface="lo", ethernet_proto=0x0003, send_timeout=0.1, recv_timeout=0.1)
uut.logger = logging.getLogger("SulleyUTLogger")

# Assemble packet...
Expand All @@ -471,6 +477,7 @@ def test_raw_l2(self):
# When
uut.open()
send_result = uut.send(data=raw_packet)
_ = uut.recv(10000) # Discard the data we just sent ourself
received = uut.recv(10000)
uut.close()

Expand All @@ -481,7 +488,7 @@ def test_raw_l2(self):
# Then
self.assertEqual(send_result, len(expected_server_receive))
self.assertEqual(raw_packet, server.received)
self.assertEqual(received, b"")
self.assertEqual(received, server.data_to_send)

@pytest.mark.skipif(sys.platform in ["win32", "darwin"], reason="Raw sockets not supported on Windows/Mac OS.")
def test_raw_l2_max_size(self):
Expand All @@ -499,10 +506,10 @@ def test_raw_l2_max_size(self):

# Given
server = MiniTestServer(proto="raw", host="lo")
server.data_to_send = b"GKC"
server.data_to_send = ETHER_IPV4_HEADER + b"GKC"
server.bind()

uut = SocketConnection(host="lo", proto="raw-l2", recv_timeout=0.1)
uut = RawL2SocketConnection(interface="lo", ethernet_proto=0x0003, send_timeout=0.1, recv_timeout=0.1)
uut.logger = logging.getLogger("SulleyUTLogger")
data_to_send = b"1" * uut.max_send_size

Expand All @@ -517,6 +524,7 @@ def test_raw_l2_max_size(self):
# When
uut.open()
send_result = uut.send(data=raw_packet)
_ = uut.recv(10000) # Discard the data we just sent ourself
received = uut.recv(10000)
uut.close()

Expand All @@ -527,7 +535,7 @@ def test_raw_l2_max_size(self):
# Then
self.assertEqual(send_result, uut.max_send_size)
self.assertEqual(expected_server_receive, server.received)
self.assertEqual(received, b"")
self.assertEqual(received, server.data_to_send)

@pytest.mark.skipif(sys.platform in ["win32", "darwin"], reason="Raw sockets not supported on Windows/Mac OS.")
def test_raw_l2_oversized(self):
Expand All @@ -545,10 +553,10 @@ def test_raw_l2_oversized(self):

# Given
server = MiniTestServer(proto="raw", host="lo")
server.data_to_send = b"GKC"
server.data_to_send = ETHER_IPV4_HEADER + b"GKC"
server.bind()

uut = SocketConnection(host="lo", proto="raw-l2", recv_timeout=0.1)
uut = RawL2SocketConnection(interface="lo", ethernet_proto=0x0003, send_timeout=0.1, recv_timeout=0.1)
uut.logger = logging.getLogger("SulleyUTLogger")
data_to_send = b"F" * (uut.max_send_size + 1)

Expand All @@ -563,6 +571,7 @@ def test_raw_l2_oversized(self):
# When
uut.open()
send_result = uut.send(data=raw_packet)
_ = uut.recv(10000) # Discard the data we just sent ourself
received = uut.recv(10000)
uut.close()

Expand All @@ -573,7 +582,7 @@ def test_raw_l2_oversized(self):
# Then
self.assertEqual(send_result, uut.max_send_size)
self.assertEqual(expected_server_receive, server.received)
self.assertEqual(received, b"")
self.assertEqual(received, server.data_to_send)

@pytest.mark.skipif(sys.platform in ["win32", "darwin"], reason="Raw sockets not supported on Windows/Mac OS.")
def test_raw_l3(self):
Expand All @@ -595,10 +604,10 @@ def test_raw_l3(self):

# Given
server = MiniTestServer(proto="raw", host="lo")
server.data_to_send = b"GKC"
server.data_to_send = ETHER_IPV4_HEADER + b"GKC"
server.bind()

uut = SocketConnection(host="lo", proto="raw-l3")
uut = RawL3SocketConnection(interface="lo")
uut.logger = logging.getLogger("SulleyUTLogger")

# Assemble packet...
Expand All @@ -607,7 +616,7 @@ def test_raw_l3(self):
src_ip=b"\x7F\x00\x00\x01",
dst_ip=b"\x7F\x00\x00\x01",
)
expected_server_receive = b"\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00" + raw_packet
expected_server_receive = ETHER_IPV4_HEADER + raw_packet

t = threading.Thread(target=functools.partial(server.receive_until, expected_server_receive))
t.daemon = True
Expand All @@ -616,6 +625,7 @@ def test_raw_l3(self):
# When
uut.open()
send_result = uut.send(data=raw_packet)
_ = uut.recv(10000) # Discard the data we just sent ourself
received = uut.recv(10000)
uut.close()

Expand All @@ -626,7 +636,7 @@ def test_raw_l3(self):
# Then
self.assertEqual(send_result, len(raw_packet))
self.assertEqual(expected_server_receive, server.received)
self.assertEqual(received, raw_packet)
self.assertEqual(received, b"GKC")

@pytest.mark.skipif(sys.platform in ["win32", "darwin"], reason="Raw sockets not supported on Windows/Mac OS.")
def test_raw_l3_max_size(self):
Expand All @@ -644,16 +654,16 @@ def test_raw_l3_max_size(self):

# Given
server = MiniTestServer(proto="raw", host="lo")
server.data_to_send = b"GKC"
server.data_to_send = ETHER_IPV4_HEADER + b"GKC"
server.bind()

uut = SocketConnection(host="lo", proto="raw-l3")
uut = RawL3SocketConnection(interface="lo")
uut.logger = logging.getLogger("SulleyUTLogger")
data_to_send = b"0" * uut.packet_size

# Assemble packet...
raw_packet = data_to_send
expected_server_receive = b"\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00" + raw_packet
expected_server_receive = ETHER_IPV4_HEADER + raw_packet

t = threading.Thread(target=functools.partial(server.receive_until, expected_server_receive))
t.daemon = True
Expand All @@ -662,6 +672,7 @@ def test_raw_l3_max_size(self):
# When
uut.open()
send_result = uut.send(data=raw_packet)
_ = uut.recv(10000) # Discard the data we just sent ourself
received = uut.recv(10000)
uut.close()

Expand All @@ -672,7 +683,7 @@ def test_raw_l3_max_size(self):
# Then
self.assertEqual(send_result, uut.packet_size)
self.assertEqual(expected_server_receive, server.received)
self.assertEqual(received, data_to_send)
self.assertEqual(received, b"GKC")

@pytest.mark.skipif(sys.platform in ["win32", "darwin"], reason="Raw sockets not supported on Windows/Mac OS.")
def test_raw_l3_oversized(self):
Expand All @@ -690,18 +701,16 @@ def test_raw_l3_oversized(self):

# Given
server = MiniTestServer(proto="raw", host="lo")
server.data_to_send = b"GKC"
server.data_to_send = ETHER_IPV4_HEADER + b"GKC"
server.bind()

uut = SocketConnection(host="lo", proto="raw-l3")
uut = RawL3SocketConnection(interface="lo")
uut.logger = logging.getLogger("SulleyUTLogger")
data_to_send = b"D" * (uut.packet_size + 1)

# Assemble packet...
raw_packet = data_to_send
expected_server_receive = (
b"\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00" + raw_packet[: uut.packet_size]
)
expected_server_receive = ETHER_IPV4_HEADER + raw_packet[: uut.packet_size]

t = threading.Thread(target=functools.partial(server.receive_until, expected_server_receive))
t.daemon = True
Expand All @@ -710,6 +719,7 @@ def test_raw_l3_oversized(self):
# When
uut.open()
send_result = uut.send(data=raw_packet)
_ = uut.recv(10000) # Discard the data we just sent ourself
received = uut.recv(10000)
uut.close()

Expand All @@ -720,7 +730,7 @@ def test_raw_l3_oversized(self):
# Then
self.assertEqual(send_result, uut.packet_size)
self.assertEqual(expected_server_receive, server.received)
self.assertEqual(received, data_to_send[:-1])
self.assertEqual(received, b"GKC")

def test_required_args_port(self):
"""
Expand Down

0 comments on commit 2f1ea5c

Please sign in to comment.