Skip to content

Commit

Permalink
[feat] IPv4 header checksum and total_length compute functions
Browse files Browse the repository at this point in the history
  • Loading branch information
quentinlampin committed Jan 6, 2025
1 parent 267bee5 commit 42eabe9
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 5 deletions.
4 changes: 3 additions & 1 deletion microschc/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from microschc.binary import Buffer
from microschc.protocol.compute import ComputeFunctionDependenciesType, ComputeFunctionType


from .ipv4 import IPv4ComputeFunctions, IPv4Fields
from .ipv6 import IPv6ComputeFunctions, IPv6Fields
from .udp import UDPComputeFunctions, UDPFields

ComputeFunctions: Dict[str, Tuple[ComputeFunctionType, ComputeFunctionDependenciesType]] = {
IPv4Fields.TOTAL_LENGTH: IPv4ComputeFunctions[IPv4Fields.TOTAL_LENGTH],
IPv4Fields.HEADER_CHECKSUM: IPv4ComputeFunctions[IPv4Fields.HEADER_CHECKSUM],
IPv6Fields.PAYLOAD_LENGTH: IPv6ComputeFunctions[IPv6Fields.PAYLOAD_LENGTH],
UDPFields.LENGTH: UDPComputeFunctions[UDPFields.LENGTH],
UDPFields.CHECKSUM: UDPComputeFunctions[UDPFields.CHECKSUM]
Expand Down
62 changes: 60 additions & 2 deletions microschc/protocol/ipv4.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
"""

from enum import Enum
from typing import List, Type
from microschc.binary.buffer import Buffer
from functools import reduce
from typing import Dict, List, Tuple, Type
from microschc.binary.buffer import Buffer, Padding
from microschc.parser import HeaderParser, ParserError
from microschc.protocol.compute import ComputeFunctionDependenciesType, ComputeFunctionType
from microschc.protocol.registry import PARSERS, REGISTER_PARSER, ProtocolsIDs
from microschc.rfc8724 import FieldDescriptor, HeaderDescriptor

Expand Down Expand Up @@ -207,5 +209,61 @@ def parse(self, buffer:bytes) -> HeaderDescriptor:

# cursor += option_offset

def _compute_total_length( decompressed_fields: List[Tuple[str, Buffer]], rule_field_position:int) -> Buffer:
fields_values: List[Buffer] = [field_value for _, field_value in decompressed_fields]
ipv4_fields: List[Buffer] = [field for field in fields_values[rule_field_position-2:]]
ipv4_buffer: Buffer = reduce(lambda x, y: x+y, ipv4_fields, Buffer(content=b'', length=0))

total_length: int = ipv4_buffer.length // 8 if ipv4_buffer.length%8 == 0 else ipv4_buffer.length // 8 + 1
buffer: Buffer = Buffer(content=total_length.to_bytes(2, 'big'), length=16, padding=Padding.LEFT)
return buffer

def _compute_checksum(decompressed_fields: List[Tuple[str, Buffer]], rule_field_position: int) -> Buffer:
"""
Checksum is the 16-bit one's complement of the one's complement sum of a
IPv4 header.
If the computed checksum is zero, it is transmitted as all ones (the
equivalent in one's complement arithmetic). An all zero transmitted
checksum value means that the transmitter generated no checksum (for
debugging or for higher level protocols that don't care).
"""

# retrieve IPv4 header fields
ipv4_header_fields: List[Buffer] = [ field_value for _, field_value in decompressed_fields[rule_field_position-9:rule_field_position+3]]
ipv4_header: Buffer = reduce(lambda x, y: x+y, ipv4_header_fields, Buffer(content=b'', length=0))

checksum_value: int = 0
header_checksum: int = 0
# compute the sum of the 2-bytes chunks of the IPv4 header
for chunk in ipv4_header.chunks(length=16):
header_checksum += chunk.value(type='unsigned int')
carry = header_checksum >> 16
header_checksum = (header_checksum + carry) & 0xffff

checksum_value = ~header_checksum & 0xffff

# if checksum is 0x0000 return 0xffff
checksum_value = 0xffff if checksum_value == 0x0000 else checksum_value
checksum_buffer: Buffer = Buffer(content=checksum_value.to_bytes(2, 'big'), length=16)
return checksum_buffer


IPv4ComputeFunctions: Dict[str, Tuple[ComputeFunctionType, ComputeFunctionDependenciesType]] = {
IPv4Fields.TOTAL_LENGTH: (_compute_total_length, {}),
IPv4Fields.HEADER_CHECKSUM: (_compute_checksum, {
IPv4Fields.VERSION,
IPv4Fields.HEADER_LENGTH,
IPv4Fields.TYPE_OF_SERVICE,
IPv4Fields.TOTAL_LENGTH,
IPv4Fields.IDENTIFICATION,
IPv4Fields.FLAGS,
IPv4Fields.FRAGMENT_OFFSET,
IPv4Fields.TIME_TO_LIVE,
IPv4Fields.PROTOCOL,
IPv4Fields.SRC_ADDRESS,
IPv4Fields.DST_ADDRESS,
}),
}

REGISTER_PARSER(protocol_id=ProtocolsIDs.IPV4, parser_class=IPv4Parser)
112 changes: 110 additions & 2 deletions tests/protocol/test_ipv4.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from microschc.protocol.ipv4 import IPv4Parser, IPv4Fields
from typing import List, Tuple
from microschc.protocol.ipv4 import IPv4Parser, IPv4Fields, IPv4ComputeFunctions
from microschc.parser.parser import HeaderDescriptor
from microschc.rfc8724 import FieldDescriptor
from microschc.binary.buffer import Buffer
from microschc.rfc8724extras import ParserDefinitions

def test_ipv4_parser_import():
"""test: IPv6 header parser import and instanciation
Expand All @@ -13,7 +15,7 @@ def test_ipv4_parser_import():
def test_ipv4_parser_parse():
"""test: IPv4 header parser parses IPv4 packet
The packet is made of an IPv6 header with following fields:
The packet is made of an IPv4 header with following fields:
- id='Version' length=4 position=0 value=b'\x04'
- id='Header Length' length=4 position=0 value=b'\x05'
- id='Type of Service' length=8 position=0 value=b'\x00'
Expand Down Expand Up @@ -110,5 +112,111 @@ def test_ipv4_parser_parse():
assert destination_address_fd.value == Buffer(content=b'\xac\x1e\x01\x02', length=32)


def test_ipv4_compute_total_length():

parser:IPv4Parser = IPv4Parser()
partially_reconstructed_ipv4_content:bytes = bytes(b"\x45\x00\x00\x00\x21\xfa\x40\x00\x40\x11\xbc\x52\xac\x1e\x01\x08" # total length is \x00\x00, should be \x02\x5c
b"\xac\x1e\x01\x02")
ipv4_payload_content:bytes = bytes(b"\xc8\xe2\x16\x33\x02\x46\x5c\x9e"
b"\x48\x02\xba\xbf\x98\x13\xbd\x0e\x51\x2d\xbc\xf7\xb2\x72\x64\x11"
b"\x28\x33\x62\x3d\x55\x09\x6c\x77\x6d\x32\x6d\x3d\x31\x2e\x31\x06"
b"\x6c\x74\x3d\x33\x30\x30\x0d\x02\x65\x70\x3d\x65\x32\x63\x61\x37"
b"\x38\x37\x65\x64\x32\x66\x35\xc1\x0d\xd2\x14\x07\x2b\xff\x3c\x2f"
b"\x3e\x3b\x72\x74\x3d\x22\x6f\x6d\x61\x2e\x6c\x77\x6d\x32\x6d\x22"
b"\x3b\x63\x74\x3d\x22\x36\x30\x20\x31\x31\x30\x20\x31\x31\x32\x20"
b"\x31\x31\x35\x34\x32\x20\x31\x31\x35\x34\x33\x22\x2c\x3c\x2f\x31"
b"\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x31\x2f\x30\x3e"
b"\x2c\x3c\x2f\x33\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x32\x2c\x3c\x2f"
b"\x33\x2f\x30\x3e\x2c\x3c\x2f\x36\x2f\x30\x3e\x2c\x3c\x2f\x33\x33"
b"\x30\x31\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33"
b"\x30\x31\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x30\x32\x3e\x3b\x76\x65"
b"\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x30\x32\x2f\x30\x3e\x2c"
b"\x3c\x2f\x33\x33\x30\x33\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c"
b"\x3c\x2f\x33\x33\x30\x33\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x30\x34"
b"\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x30\x34"
b"\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x30\x35\x3e\x3b\x76\x65\x72\x3d"
b"\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x30\x35\x2f\x30\x3e\x2c\x3c\x2f"
b"\x33\x33\x30\x36\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f"
b"\x33\x33\x30\x36\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x30\x38\x3e\x3b"
b"\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x30\x38\x2f\x30"
b"\x3e\x2c\x3c\x2f\x33\x33\x31\x30\x3e\x3b\x76\x65\x72\x3d\x31\x2e"
b"\x31\x2c\x3c\x2f\x33\x33\x31\x30\x2f\x30\x3e\x2c\x3c\x2f\x33\x33"
b"\x31\x31\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x31\x32\x3e\x3b\x76\x65"
b"\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x31\x32\x2f\x30\x3e\x2c"
b"\x3c\x2f\x33\x33\x31\x33\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c"
b"\x3c\x2f\x33\x33\x31\x33\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x31\x34"
b"\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x31\x34"
b"\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x31\x35\x3e\x3b\x76\x65\x72\x3d"
b"\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x31\x35\x2f\x30\x3e\x2c\x3c\x2f"
b"\x33\x33\x31\x36\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f"
b"\x33\x33\x31\x36\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x31\x37\x3e\x3b"
b"\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x31\x37\x2f\x30"
b"\x3e\x2c\x3c\x2f\x33\x33\x31\x38\x3e\x3b\x76\x65\x72\x3d\x31\x2e"
b"\x31\x2c\x3c\x2f\x33\x33\x31\x38\x2f\x30\x3e\x2c\x3c\x2f\x33\x33"
b"\x31\x39\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f"
)

partially_reconstructed_ipv4_packet_buffer: Buffer = Buffer(content=partially_reconstructed_ipv4_content, length=len(partially_reconstructed_ipv4_content)*8)
ipv4_payload_buffer: Buffer = Buffer(content=ipv4_payload_content, length=len(ipv4_payload_content)*8)
ipv4_header_descriptor: HeaderDescriptor = parser.parse(buffer=partially_reconstructed_ipv4_packet_buffer)
decompressed_fields: List[Tuple[str, Buffer]] = [ (field.id,field.value) for field in ipv4_header_descriptor.fields]
decompressed_fields.append((ParserDefinitions.PAYLOAD, ipv4_payload_buffer))


total_length_buffer: Buffer = IPv4ComputeFunctions[IPv4Fields.TOTAL_LENGTH][0](decompressed_fields, 2)
assert total_length_buffer == Buffer(content=b'\x02\x5a', length=16)


def test_ipv4_compute_checksum():

parser:IPv4Parser = IPv4Parser()
partially_reconstructed_ipv4_content:bytes = bytes(b"\x45\x00\x02\x5a\x21\xfa\x40\x00\x40\x11\x00\x00\xac\x1e\x01\x08" # checksum is \x00\x00, should be \xbc\x52
b"\xac\x1e\x01\x02")
ipv4_payload_content:bytes = bytes(b"\xc8\xe2\x16\x33\x02\x46\x5c\x9e"
b"\x48\x02\xba\xbf\x98\x13\xbd\x0e\x51\x2d\xbc\xf7\xb2\x72\x64\x11"
b"\x28\x33\x62\x3d\x55\x09\x6c\x77\x6d\x32\x6d\x3d\x31\x2e\x31\x06"
b"\x6c\x74\x3d\x33\x30\x30\x0d\x02\x65\x70\x3d\x65\x32\x63\x61\x37"
b"\x38\x37\x65\x64\x32\x66\x35\xc1\x0d\xd2\x14\x07\x2b\xff\x3c\x2f"
b"\x3e\x3b\x72\x74\x3d\x22\x6f\x6d\x61\x2e\x6c\x77\x6d\x32\x6d\x22"
b"\x3b\x63\x74\x3d\x22\x36\x30\x20\x31\x31\x30\x20\x31\x31\x32\x20"
b"\x31\x31\x35\x34\x32\x20\x31\x31\x35\x34\x33\x22\x2c\x3c\x2f\x31"
b"\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x31\x2f\x30\x3e"
b"\x2c\x3c\x2f\x33\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x32\x2c\x3c\x2f"
b"\x33\x2f\x30\x3e\x2c\x3c\x2f\x36\x2f\x30\x3e\x2c\x3c\x2f\x33\x33"
b"\x30\x31\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33"
b"\x30\x31\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x30\x32\x3e\x3b\x76\x65"
b"\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x30\x32\x2f\x30\x3e\x2c"
b"\x3c\x2f\x33\x33\x30\x33\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c"
b"\x3c\x2f\x33\x33\x30\x33\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x30\x34"
b"\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x30\x34"
b"\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x30\x35\x3e\x3b\x76\x65\x72\x3d"
b"\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x30\x35\x2f\x30\x3e\x2c\x3c\x2f"
b"\x33\x33\x30\x36\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f"
b"\x33\x33\x30\x36\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x30\x38\x3e\x3b"
b"\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x30\x38\x2f\x30"
b"\x3e\x2c\x3c\x2f\x33\x33\x31\x30\x3e\x3b\x76\x65\x72\x3d\x31\x2e"
b"\x31\x2c\x3c\x2f\x33\x33\x31\x30\x2f\x30\x3e\x2c\x3c\x2f\x33\x33"
b"\x31\x31\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x31\x32\x3e\x3b\x76\x65"
b"\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x31\x32\x2f\x30\x3e\x2c"
b"\x3c\x2f\x33\x33\x31\x33\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c"
b"\x3c\x2f\x33\x33\x31\x33\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x31\x34"
b"\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x31\x34"
b"\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x31\x35\x3e\x3b\x76\x65\x72\x3d"
b"\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x31\x35\x2f\x30\x3e\x2c\x3c\x2f"
b"\x33\x33\x31\x36\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f"
b"\x33\x33\x31\x36\x2f\x30\x3e\x2c\x3c\x2f\x33\x33\x31\x37\x3e\x3b"
b"\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f\x33\x33\x31\x37\x2f\x30"
b"\x3e\x2c\x3c\x2f\x33\x33\x31\x38\x3e\x3b\x76\x65\x72\x3d\x31\x2e"
b"\x31\x2c\x3c\x2f\x33\x33\x31\x38\x2f\x30\x3e\x2c\x3c\x2f\x33\x33"
b"\x31\x39\x3e\x3b\x76\x65\x72\x3d\x31\x2e\x31\x2c\x3c\x2f"
)

partially_reconstructed_ipv4_packet_buffer: Buffer = Buffer(content=partially_reconstructed_ipv4_content, length=len(partially_reconstructed_ipv4_content)*8)
ipv4_payload_buffer: Buffer = Buffer(content=ipv4_payload_content, length=len(ipv4_payload_content)*8)
ipv4_header_descriptor: HeaderDescriptor = parser.parse(buffer=partially_reconstructed_ipv4_packet_buffer)
decompressed_fields: List[Tuple[str, Buffer]] = [ (field.id,field.value) for field in ipv4_header_descriptor.fields]
decompressed_fields.append((ParserDefinitions.PAYLOAD, ipv4_payload_buffer))


header_checksum_buffer: Buffer = IPv4ComputeFunctions[IPv4Fields.HEADER_CHECKSUM][0](decompressed_fields, 9)
assert header_checksum_buffer == Buffer(content=b'\xbc\x52', length=16)

0 comments on commit 42eabe9

Please sign in to comment.