diff --git a/microschc/protocol/__init__.py b/microschc/protocol/__init__.py index 412cc28..f4774ce 100644 --- a/microschc/protocol/__init__.py +++ b/microschc/protocol/__init__.py @@ -3,11 +3,13 @@ from microschc.binary import Buffer from microschc.protocol.compute import ComputeFunctionDependenciesType, ComputeFunctionType - +from .ipv4 import IPv4ComputeFunctions, IPv4Fields from .ipv6 import IPv6ComputeFunctions, IPv6Fields from .udp import UDPComputeFunctions, UDPFields ComputeFunctions: Dict[str, Tuple[ComputeFunctionType, ComputeFunctionDependenciesType]] = { + IPv4Fields.TOTAL_LENGTH: IPv4ComputeFunctions[IPv4Fields.TOTAL_LENGTH], + IPv4Fields.HEADER_CHECKSUM: IPv4ComputeFunctions[IPv4Fields.HEADER_CHECKSUM], IPv6Fields.PAYLOAD_LENGTH: IPv6ComputeFunctions[IPv6Fields.PAYLOAD_LENGTH], UDPFields.LENGTH: UDPComputeFunctions[UDPFields.LENGTH], UDPFields.CHECKSUM: UDPComputeFunctions[UDPFields.CHECKSUM] diff --git a/microschc/protocol/ipv4.py b/microschc/protocol/ipv4.py index 653792b..d522d4d 100644 --- a/microschc/protocol/ipv4.py +++ b/microschc/protocol/ipv4.py @@ -10,9 +10,11 @@ """ from enum import Enum -from typing import List, Type -from microschc.binary.buffer import Buffer +from functools import reduce +from typing import Dict, List, Tuple, Type +from microschc.binary.buffer import Buffer, Padding from microschc.parser import HeaderParser, ParserError +from microschc.protocol.compute import ComputeFunctionDependenciesType, ComputeFunctionType from microschc.protocol.registry import PARSERS, REGISTER_PARSER, ProtocolsIDs from microschc.rfc8724 import FieldDescriptor, HeaderDescriptor @@ -207,5 +209,61 @@ def parse(self, buffer:bytes) -> HeaderDescriptor: # cursor += option_offset +def _compute_total_length( decompressed_fields: List[Tuple[str, Buffer]], rule_field_position:int) -> Buffer: + fields_values: List[Buffer] = [field_value for _, field_value in decompressed_fields] + ipv4_fields: List[Buffer] = [field for field in fields_values[rule_field_position-2:]] + ipv4_buffer: Buffer = reduce(lambda x, y: x+y, ipv4_fields, Buffer(content=b'', length=0)) + + total_length: int = ipv4_buffer.length // 8 if ipv4_buffer.length%8 == 0 else ipv4_buffer.length // 8 + 1 + buffer: Buffer = Buffer(content=total_length.to_bytes(2, 'big'), length=16, padding=Padding.LEFT) + return buffer + +def _compute_checksum(decompressed_fields: List[Tuple[str, Buffer]], rule_field_position: int) -> Buffer: + """ + Checksum is the 16-bit one's complement of the one's complement sum of a + IPv4 header. + + If the computed checksum is zero, it is transmitted as all ones (the + equivalent in one's complement arithmetic). An all zero transmitted + checksum value means that the transmitter generated no checksum (for + debugging or for higher level protocols that don't care). + """ + + # retrieve IPv4 header fields + ipv4_header_fields: List[Buffer] = [ field_value for _, field_value in decompressed_fields[rule_field_position-9:rule_field_position+3]] + ipv4_header: Buffer = reduce(lambda x, y: x+y, ipv4_header_fields, Buffer(content=b'', length=0)) + + checksum_value: int = 0 + header_checksum: int = 0 + # compute the sum of the 2-bytes chunks of the IPv4 header + for chunk in ipv4_header.chunks(length=16): + header_checksum += chunk.value(type='unsigned int') + carry = header_checksum >> 16 + header_checksum = (header_checksum + carry) & 0xffff + + checksum_value = ~header_checksum & 0xffff + + # if checksum is 0x0000 return 0xffff + checksum_value = 0xffff if checksum_value == 0x0000 else checksum_value + checksum_buffer: Buffer = Buffer(content=checksum_value.to_bytes(2, 'big'), length=16) + return checksum_buffer + + +IPv4ComputeFunctions: Dict[str, Tuple[ComputeFunctionType, ComputeFunctionDependenciesType]] = { + IPv4Fields.TOTAL_LENGTH: (_compute_total_length, {}), + IPv4Fields.HEADER_CHECKSUM: (_compute_checksum, { + IPv4Fields.VERSION, + IPv4Fields.HEADER_LENGTH, + IPv4Fields.TYPE_OF_SERVICE, + IPv4Fields.TOTAL_LENGTH, + IPv4Fields.IDENTIFICATION, + IPv4Fields.FLAGS, + IPv4Fields.FRAGMENT_OFFSET, + IPv4Fields.TIME_TO_LIVE, + IPv4Fields.PROTOCOL, + IPv4Fields.SRC_ADDRESS, + IPv4Fields.DST_ADDRESS, + }), +} REGISTER_PARSER(protocol_id=ProtocolsIDs.IPV4, parser_class=IPv4Parser) \ No newline at end of file diff --git a/tests/protocol/test_ipv4.py b/tests/protocol/test_ipv4.py index 8aa70c6..033f961 100644 --- a/tests/protocol/test_ipv4.py +++ b/tests/protocol/test_ipv4.py @@ -1,7 +1,9 @@ -from microschc.protocol.ipv4 import IPv4Parser, IPv4Fields +from typing import List, Tuple +from microschc.protocol.ipv4 import IPv4Parser, IPv4Fields, IPv4ComputeFunctions from microschc.parser.parser import HeaderDescriptor from microschc.rfc8724 import FieldDescriptor from microschc.binary.buffer import Buffer +from microschc.rfc8724extras import ParserDefinitions def test_ipv4_parser_import(): """test: IPv6 header parser import and instanciation @@ -13,7 +15,7 @@ def test_ipv4_parser_import(): def test_ipv4_parser_parse(): """test: IPv4 header parser parses IPv4 packet - The packet is made of an IPv6 header with following fields: + The packet is made of an IPv4 header with following fields: - id='Version' length=4 position=0 value=b'\x04' - id='Header Length' length=4 position=0 value=b'\x05' - id='Type of Service' length=8 position=0 value=b'\x00' @@ -110,5 +112,111 @@ def test_ipv4_parser_parse(): assert destination_address_fd.value == Buffer(content=b'\xac\x1e\x01\x02', length=32) +def test_ipv4_compute_total_length(): + parser:IPv4Parser = IPv4Parser() + partially_reconstructed_ipv4_content:bytes = bytes(b"\x45\x00\x00\x00\x21\xfa\x40\x00\x40\x11\xbc\x52\xac\x1e\x01\x08" # total length is \x00\x00, should be \x02\x5c + b"\xac\x1e\x01\x02") + ipv4_payload_content:bytes = bytes(b"\xc8\xe2\x16\x33\x02\x46\x5c\x9e" + b"\x48\x02\xba\xbf\x98\x13\xbd\x0e\x51\x2d\xbc\xf7\xb2\x72\x64\x11" + b"\x28\x33\x62\x3d\x55\x09\x6c\x77\x6d\x32\x6d\x3d\x31\x2e\x31\x06" + b"\x6c\x74\x3d\x33\x30\x30\x0d\x02\x65\x70\x3d\x65\x32\x63\x61\x37" + b"\x38\x37\x65\x64\x32\x66\x35\xc1\x0d\xd2\x14\x07\x2b\xff\x3c\x2f" + b"\x3e\x3b\x72\x74\x3d\x22\x6f\x6d\x61\x2e\x6c\x77\x6d\x32\x6d\x22" + b"\x3b\x63\x74\x3d\x22\x36\x30\x20\x31\x31\x30\x20\x31\x31\x32\x20" + b"\x31\x31\x35\x34\x32\x20\x31\x31\x35\x34\x33\x22\x2c\x3c\x2f\x31" + b"\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x31\x2f\x30\x3e" + b"\x2c\x3c\x2f\x33\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x32\x2c\x3c\x2f" + b"\x33\x2f\x30\x3e\x2c\x3c\x2f\x36\x2f\x30\x3e\x2c\x3c\x2f\x33\x33" + b"\x30\x31\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33" + b"\x30\x31\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x30\x32\x3e\x3b\x76\x65" + b"\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x30\x32\x2f\x30\x3e\x2c" + b"\x3c\x2f\x33\x33\x30\x33\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c" + b"\x3c\x2f\x33\x33\x30\x33\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x30\x34" + b"\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x30\x34" + b"\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x30\x35\x3e\x3b\x76\x65\x72\x3d" + b"\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x30\x35\x2f\x30\x3e\x2c\x3c\x2f" + b"\x33\x33\x30\x36\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f" + b"\x33\x33\x30\x36\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x30\x38\x3e\x3b" + b"\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x30\x38\x2f\x30" + b"\x3e\x2c\x3c\x2f\x33\x33\x31\x30\x3e\x3b\x76\x65\x72\x3d\x31\x2e" + b"\x31\x2c\x3c\x2f\x33\x33\x31\x30\x2f\x30\x3e\x2c\x3c\x2f\x33\x33" + b"\x31\x31\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x31\x32\x3e\x3b\x76\x65" + b"\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x31\x32\x2f\x30\x3e\x2c" + b"\x3c\x2f\x33\x33\x31\x33\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c" + b"\x3c\x2f\x33\x33\x31\x33\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x31\x34" + b"\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x31\x34" + b"\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x31\x35\x3e\x3b\x76\x65\x72\x3d" + b"\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x31\x35\x2f\x30\x3e\x2c\x3c\x2f" + b"\x33\x33\x31\x36\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f" + b"\x33\x33\x31\x36\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x31\x37\x3e\x3b" + b"\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x31\x37\x2f\x30" + b"\x3e\x2c\x3c\x2f\x33\x33\x31\x38\x3e\x3b\x76\x65\x72\x3d\x31\x2e" + b"\x31\x2c\x3c\x2f\x33\x33\x31\x38\x2f\x30\x3e\x2c\x3c\x2f\x33\x33" + b"\x31\x39\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f" + ) + + partially_reconstructed_ipv4_packet_buffer: Buffer = Buffer(content=partially_reconstructed_ipv4_content, length=len(partially_reconstructed_ipv4_content)*8) + ipv4_payload_buffer: Buffer = Buffer(content=ipv4_payload_content, length=len(ipv4_payload_content)*8) + ipv4_header_descriptor: HeaderDescriptor = parser.parse(buffer=partially_reconstructed_ipv4_packet_buffer) + decompressed_fields: List[Tuple[str, Buffer]] = [ (field.id,field.value) for field in ipv4_header_descriptor.fields] + decompressed_fields.append((ParserDefinitions.PAYLOAD, ipv4_payload_buffer)) + + + total_length_buffer: Buffer = IPv4ComputeFunctions[IPv4Fields.TOTAL_LENGTH][0](decompressed_fields, 2) + assert total_length_buffer == Buffer(content=b'\x02\x5a', length=16) + + +def test_ipv4_compute_checksum(): + + parser:IPv4Parser = IPv4Parser() + partially_reconstructed_ipv4_content:bytes = bytes(b"\x45\x00\x02\x5a\x21\xfa\x40\x00\x40\x11\x00\x00\xac\x1e\x01\x08" # checksum is \x00\x00, should be \xbc\x52 + b"\xac\x1e\x01\x02") + ipv4_payload_content:bytes = bytes(b"\xc8\xe2\x16\x33\x02\x46\x5c\x9e" + b"\x48\x02\xba\xbf\x98\x13\xbd\x0e\x51\x2d\xbc\xf7\xb2\x72\x64\x11" + b"\x28\x33\x62\x3d\x55\x09\x6c\x77\x6d\x32\x6d\x3d\x31\x2e\x31\x06" + b"\x6c\x74\x3d\x33\x30\x30\x0d\x02\x65\x70\x3d\x65\x32\x63\x61\x37" + b"\x38\x37\x65\x64\x32\x66\x35\xc1\x0d\xd2\x14\x07\x2b\xff\x3c\x2f" + b"\x3e\x3b\x72\x74\x3d\x22\x6f\x6d\x61\x2e\x6c\x77\x6d\x32\x6d\x22" + b"\x3b\x63\x74\x3d\x22\x36\x30\x20\x31\x31\x30\x20\x31\x31\x32\x20" + b"\x31\x31\x35\x34\x32\x20\x31\x31\x35\x34\x33\x22\x2c\x3c\x2f\x31" + b"\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x31\x2f\x30\x3e" + b"\x2c\x3c\x2f\x33\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x32\x2c\x3c\x2f" + b"\x33\x2f\x30\x3e\x2c\x3c\x2f\x36\x2f\x30\x3e\x2c\x3c\x2f\x33\x33" + b"\x30\x31\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33" + b"\x30\x31\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x30\x32\x3e\x3b\x76\x65" + b"\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x30\x32\x2f\x30\x3e\x2c" + b"\x3c\x2f\x33\x33\x30\x33\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c" + b"\x3c\x2f\x33\x33\x30\x33\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x30\x34" + b"\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x30\x34" + b"\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x30\x35\x3e\x3b\x76\x65\x72\x3d" + b"\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x30\x35\x2f\x30\x3e\x2c\x3c\x2f" + b"\x33\x33\x30\x36\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f" + b"\x33\x33\x30\x36\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x30\x38\x3e\x3b" + b"\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x30\x38\x2f\x30" + b"\x3e\x2c\x3c\x2f\x33\x33\x31\x30\x3e\x3b\x76\x65\x72\x3d\x31\x2e" + b"\x31\x2c\x3c\x2f\x33\x33\x31\x30\x2f\x30\x3e\x2c\x3c\x2f\x33\x33" + b"\x31\x31\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x31\x32\x3e\x3b\x76\x65" + b"\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x31\x32\x2f\x30\x3e\x2c" + b"\x3c\x2f\x33\x33\x31\x33\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c" + b"\x3c\x2f\x33\x33\x31\x33\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x31\x34" + b"\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x31\x34" + b"\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x31\x35\x3e\x3b\x76\x65\x72\x3d" + b"\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x31\x35\x2f\x30\x3e\x2c\x3c\x2f" + b"\x33\x33\x31\x36\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f" + b"\x33\x33\x31\x36\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x31\x37\x3e\x3b" + b"\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x31\x37\x2f\x30" + b"\x3e\x2c\x3c\x2f\x33\x33\x31\x38\x3e\x3b\x76\x65\x72\x3d\x31\x2e" + b"\x31\x2c\x3c\x2f\x33\x33\x31\x38\x2f\x30\x3e\x2c\x3c\x2f\x33\x33" + b"\x31\x39\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f" + ) + + partially_reconstructed_ipv4_packet_buffer: Buffer = Buffer(content=partially_reconstructed_ipv4_content, length=len(partially_reconstructed_ipv4_content)*8) + ipv4_payload_buffer: Buffer = Buffer(content=ipv4_payload_content, length=len(ipv4_payload_content)*8) + ipv4_header_descriptor: HeaderDescriptor = parser.parse(buffer=partially_reconstructed_ipv4_packet_buffer) + decompressed_fields: List[Tuple[str, Buffer]] = [ (field.id,field.value) for field in ipv4_header_descriptor.fields] + decompressed_fields.append((ParserDefinitions.PAYLOAD, ipv4_payload_buffer)) + + header_checksum_buffer: Buffer = IPv4ComputeFunctions[IPv4Fields.HEADER_CHECKSUM][0](decompressed_fields, 9) + assert header_checksum_buffer == Buffer(content=b'\xbc\x52', length=16) \ No newline at end of file