diff --git a/README.md b/README.md index 2bb0a5a..f1a90e4 100644 --- a/README.md +++ b/README.md @@ -24,8 +24,6 @@ hatch build -t wheel pip install dist/microschc--py3-none-any.whl ``` - - ## microSCHC, developpement plan microSCHC aims at implementing the SCHC Compression/Decompression (C/D) and Fragmentation/Reassembly (F/R) routines described in RFC 8724 [1]. @@ -56,10 +54,10 @@ Current features: - [ ] AppIID - [ ] compute-* (e.g. UDP-checksum) 2. Decompression counteparts - - [ ] not-sent - - [ ] value-sent - - [ ] mapping-sent - - [ ] LSB + - [x] not-sent + - [x] value-sent + - [x] mapping-sent + - [x] LSB - [ ] devIID - [ ] AppIID - [ ] compute-* (e.g. UDP-checksum) diff --git a/microschc-implementation.md b/microschc-implementation.md index f74d31a..d333b36 100644 --- a/microschc-implementation.md +++ b/microschc-implementation.md @@ -53,15 +53,24 @@ structure of header fields represented here-after. The actual option requires parsing, and interpreting the `Option Delta`, `Option Length`, etc. Similarly to the types support discussion, this requires keeping track of the encoding and decoding of those interpretations into their bytes counterparts. Furthermore, how the compression residue should be computed based on the interpretation is a mystery to me (Quentin), unless simple compression actions are performed, e.g. `not-sent`. For this reason, microSCHC parser only exposes fields and their raw content (except for the integers odd case). -## 2. The Ruler +## 2. Ruler The Ruler is in charge of the rules and their application to packets, i.e.: - rule storing: manages a collection of rules. - - rule matching: determine if a rule applies to a packet descriptor. + - rule matching for packet descriptors: determine if a rule applies to a packet descriptor. + - rule matching for SCHC packet: determine the rule descriptor corresponding to a SCHC packet. - packet compression: compress packets according to matching rules. - packet decompression: decompress packets according to rules IDs. -## 2.1 Rules field descriptors order matters +## 2.1 Rule ID + +microSCHC expects rule IDs to be provided as right-aligned (left zero-padding) bytes. microSCHC further expects rule ID length to be strictly greater than 0, as opposed to what's defined in [4], section 5: + +```text +A length of 0 is allowed to represent an implicit rule. +``` + +## 2.2 Rules field descriptors order matters The Ruler makes the assumption that rules' field descriptors are provided in the same order as in the target packet structure. For example, if the packet fields are, in order : [`field-1`, `field-2`, `field-3`, ...], it is supposed that rules field descriptors @@ -75,14 +84,23 @@ it is mandated that field descriptors applying to a given packet, i.e. once the The rationale is that unordered field descriptors eventually yield fields residues in a different order than that of the source packet. This means that the order is potentially lost at the reconstruction, leading to advert effects, including reconstructed packets different from the source packets. -## 2.2 Default Compression/Decompression rule, implementation details +## 2.3 Default Compression/Decompression rule, implementation details microSCHC expects the default Compression/Decompression rule is provided last in the list of rules. The default rule list of fields descriptors is assumed empty and matches any packet not matched by any prior rule. -## 2.3 Variable field length +## 2.4 Variable field length In microSCHC, a field of variable length is denoted with a 0 value of the FL attribute of the corresponding Field Descriptor. - [1] "Scapy, packet crafting for Python2 and Python3" - [2] "OpenSCHC: Open implementation, hackathon support, ... of the IETF SCHC protocol (compression for LPWANs), https://github.com/openschc" - [3] "RFC 7252 The Constrained Application Protocol (CoAP), Z. Shelby et al." +- [4] "Draft: Data Model for Static Context Header Compression (SCHC)", A. Minaburo et al. + +## 3. Compressor + +The compressor is in charge of the compression of a packet, provided as a packet descriptor by the parser, using the rule identified by the **Ruler**. +The compression procedure is: + - compress packet fields using compression actions defined in the corresponding field descriptors of the rule. + - concatenate fields residues prepended with the rule ID, removing (left)-padding in the process. + diff --git a/microschc/__init__.py b/microschc/__init__.py index 2b8877c..a71c5c7 100644 --- a/microschc/__init__.py +++ b/microschc/__init__.py @@ -1 +1 @@ -__version__ = '0.5.0' +__version__ = '0.7.0' diff --git a/microschc/actions/compression.py b/microschc/actions/compression.py index 8799bef..23857ac 100644 --- a/microschc/actions/compression.py +++ b/microschc/actions/compression.py @@ -22,21 +22,20 @@ """ +from microschc.binary.buffer import Buffer +from microschc.rfc8724 import FieldDescriptor, MatchMapping -from math import ceil -from microschc.rfc8724 import FieldDescriptor, FieldResidue, MatchMapping - -def not_sent(_: FieldDescriptor) -> FieldResidue: +def not_sent(_: FieldDescriptor) -> Buffer: """ `not-sent` compression action (CA): do not send the field value as it's supposed known by the receiver """ - field_residue: FieldResidue = FieldResidue(residue=b'', length=0) + field_residue: Buffer = Buffer(content=b'', bit_length=0) return field_residue -def value_sent(field_descriptor: FieldDescriptor) -> FieldResidue: +def value_sent(field_descriptor: FieldDescriptor) -> Buffer: """ `value-sent` compression action (CA): send the original value ! important note in file header about integer residues: ! @@ -46,30 +45,22 @@ def value_sent(field_descriptor: FieldDescriptor) -> FieldResidue: at the SCHC compression residue step (see Section 7.2 of [1]), when all fields residues are computed. """ - field_value: bytes = field_descriptor.value - field_length: int = field_descriptor.length - - - - residue: bytes = field_value - - field_residue: FieldResidue = FieldResidue(residue=residue, length=field_length) + field_residue: Buffer = field_descriptor.value return field_residue -def mapping_sent(field_descriptor: FieldDescriptor, mapping: MatchMapping) -> FieldResidue: +def mapping_sent(field_descriptor: FieldDescriptor, mapping: MatchMapping) -> Buffer: """ - `mapping-sent`: send the index in the mapping, at decompression the target value stored in the reverse mapping at the index is used. + `mapping-sent`: send the index in the mapping, at decompression the target value stored + in the reverse mapping at the index is used. """ - field_value: bytes = field_descriptor.value - index: int = mapping.forward[field_value] - residue_length: int = mapping.index_length - residue_bytes: int = ceil(residue_length/8) - residue: bytes = index.to_bytes(residue_bytes, 'big') - field_residue: FieldResidue = FieldResidue(residue=residue, length=residue_length) + field_value: Buffer = field_descriptor.value + index: Buffer = mapping.forward[field_value] + + field_residue: Buffer = index return field_residue -def least_significant_bits(field_descriptor: FieldDescriptor, match_pattern_length: int) -> FieldResidue: +def least_significant_bits(field_descriptor: FieldDescriptor, bit_length: int) -> Buffer: """ `LSB`: send bits not included in the pattern matched by the `MSB(x)` Matching Operator. @@ -108,24 +99,23 @@ def least_significant_bits(field_descriptor: FieldDescriptor, match_pattern_leng and the byte-padded residue is : 0 0 0 0 1 1 1 0 """ - field_value: bytes = field_descriptor.value - field_length: int = field_descriptor.length + field_value: Buffer = field_descriptor.value - # assume pattern is matched, we rerieve the residue_length last bits. + # assume pattern is matched, we retrieve the residue_length last bits. residue: bytes = b'' - residue_length: int = field_length - match_pattern_length + residue_length: int = bit_length residue_full_bytes: int = residue_length // 8 if residue_full_bytes > 0: - residue = field_value[-(residue_length // 8):] + residue = field_value.content[-(residue_length // 8):] residue_leading_bits: int = residue_length % 8 if residue_leading_bits > 0: - residue_partial_byte: int = field_value[-(residue_length // 8 + 1)] + residue_partial_byte: int = field_value.content[-(residue_length // 8 + 1)] bitmask = (0xff >> (8-residue_leading_bits)) leading_bits_residue: int = residue_partial_byte & bitmask residue = leading_bits_residue.to_bytes(1, 'big') + residue - field_residue: FieldResidue = FieldResidue(residue=residue, length=residue_length) + field_residue: Buffer = Buffer(content=residue, bit_length=residue_length) return field_residue diff --git a/microschc/binary/__init__.py b/microschc/binary/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/microschc/binary/buffer.py b/microschc/binary/buffer.py new file mode 100644 index 0000000..31970d6 --- /dev/null +++ b/microschc/binary/buffer.py @@ -0,0 +1,390 @@ +""" +Buffer data model + +Data model for buffers of bits. + +The Buffer class is the data model to manipulate bits sequences. +It provides equivalent primitives for usual binary operations extended +to bits sequences spanning over multiple bytes, e.g. shift, equality. +On top of those binary operations, the Buffer data model enables left-padded +as well as right-padded bits sequences, provides concatenation and padding +adjustement out-of-the-box and allows indexing and slicing bit sequences +using slicing notations. +""" + +from enum import Enum + +class Padding(str, Enum): + LEFT = 'left' + RIGHT = 'right' + +class Buffer: + def __init__(self, content: bytes, bit_length:int, padding=Padding.LEFT) -> None: + self.content:bytes = content + self.bit_length:int = bit_length + self.padding:Padding = padding + self.padding_length:int = 8*len(self.content) - self.bit_length + + def _update_padding(self): + self.padding_length = 8*len(self.content) - self.bit_length + + def shift(self, shift: int, inplace=True): + ''' + shift buffer, eventually expanding it in the process. + negative value means shifting to the left, positive value to the right + ''' + + shifted_buffer: Buffer = Buffer(content=b'', bit_length=self.bit_length, padding=self.padding) + padding_length = self.padding_length + + temp_buffer_content = self.content + + if shift < 0: + # left shift + shift *= -1 + shifted_buffer.bit_length += shift + if self.padding == Padding.LEFT: + if shift > self.padding_length: + # left shift is larger than padding, prepend null bytes to buffer prior to shifting + # 10 bits left shift + # <|< < < < < < < <|< v | + # 7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0| (|) byte delimiter + # +|+ + + + + + + +|- 0 1 0 1 1 0 1| (-) existing padding + # |- - - - - - - 0|1 0 1 1 0 1 0 0|0 0 0 0 0 0 0 0| (+) extra bits required + + extra_bits_required: int = shift - padding_length + + extra_bytes_right: int = (shift // 8) + + temp_buffer_content += bytes(extra_bytes_right) + if extra_bits_required > 8 * extra_bytes_right: + temp_buffer_content = bytes(b'\x00') + temp_buffer_content + + shift %= 8 + shift_complement: int = (- shift)%8 + + # residual left shift is smaller than padding + # 10 bits left shift + # | < < v | | + # 7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0| (|) byte delimiter + # |- - - - - - - -|- - - - 1 0 0 0|0 0 1 0 1 1 0 1| (-) existing padding + # |- - - - - - - -|- - 1 0 0 0 0 0|1 0 1 1 0 1 0 0| (+) extra bits required + + # bit-level shift + for i in range(len(temp_buffer_content)-1): + shifted_buffer.content += (((temp_buffer_content[i] << shift) & 0xff) + (temp_buffer_content[i+1] >> shift_complement)).to_bytes(1, 'big') + shifted_buffer.content += ((temp_buffer_content[-1] << shift) & 0xff).to_bytes(1, 'big') + elif self.padding == Padding.RIGHT: + if shift > padding_length: + extra_bytes_right: int = (shift-padding_length) // 8 + temp_buffer_content += bytes(extra_bytes_right) + shifted_buffer.content = temp_buffer_content + elif shift > 0: + # right shift + if shift < self.bit_length: + shifted_buffer.bit_length -= shift + if self.padding == Padding.LEFT: + bytes_to_remove: int = shift // 8 + temp_buffer_content = temp_buffer_content[0:len(temp_buffer_content)-bytes_to_remove] + shift %= 8 + if shift > 0: + shift_complement = (-shift)% 8 + temp_buffer_content = bytes(b'\x00') + temp_buffer_content + for i in range(1, len(temp_buffer_content)): + shifted_buffer.content += (((temp_buffer_content[i-1]<>shift)).to_bytes(1, 'big') + else: + shifted_buffer.content = temp_buffer_content + elif self.padding == Padding.RIGHT: + shift_complement = (-shift)% 8 + temp_buffer_content = bytes(b'\x00') + temp_buffer_content + for i in range(1, len(temp_buffer_content)): + shifted_buffer.content += (((temp_buffer_content[i-1]<>shift)).to_bytes(1, 'big') + else: + shifted_buffer.content = temp_buffer_content + if inplace == True: + self.content = shifted_buffer.content + self.bit_length = shifted_buffer.bit_length + self._update_padding() + return self + else: + shifted_buffer._update_padding() + return shifted_buffer + + def trim(self, inplace=True): + self_byte_length: int = self.bit_length // 8 + self_offset: int = -self.bit_length % 8 + + if self.padding == Padding.LEFT: + self_content = self.content[-self_byte_length:] if self_byte_length > 0 else b"" + if self_offset != 0: + self_bit_mask: int = 0xff >> (self_offset%8) + self_content = (self.content[-(self_byte_length+1)] & self_bit_mask).to_bytes(1, 'big') + self_content + else: + self_content = self.content[:self_byte_length] + if self_offset != 0: + self_bit_mask: int = (0xff << (self_offset%8) & 0xff) + self_content += (self.content[self_byte_length] & self_bit_mask).to_bytes(1, 'big') + + if inplace == True: + self.content = self_content + self._update_padding() + return self + else: + new_buffer: Buffer = Buffer(content=self_content, bit_length=self.bit_length, padding=self.padding) + return new_buffer + + def pad(self, padding: Padding, inplace=True): + + if inplace == True and padding == self.padding: + return self + + if inplace == False and padding == self.padding: + return Buffer(content=self.content, bit_length=self.bit_length, padding=self.padding) + + self_copy: Buffer = self.copy() + + padding_length: int = self.padding_length + if padding == Padding.RIGHT: + # self padding is Padding.LEFT + # we shift left + shift_value = - padding_length + else: + # self padding is Padding.RIGHT + # we shift right + self_copy.bit_length = 8*len(self.content) + self_copy._update_padding() + self_copy.padding = padding + shift_value = + padding_length + + buffer: Buffer = self_copy.shift(shift=shift_value, inplace=True) + buffer.bit_length = self.bit_length + buffer.padding = padding + buffer._update_padding() + if inplace == True: + self.content = buffer.content + self.bit_length = buffer.bit_length + self.padding = buffer.padding + return buffer + + def copy(self): + return Buffer(content=self.content, bit_length= self.bit_length, padding=self.padding) + + def __eq__(self, another: object) -> bool: + ''' + returns True if `another` has the same first `bit_length` bits (excluding padding) + ''' + if isinstance(another, Buffer) is False: + return False + assert isinstance(another, Buffer) # for linter + if self.bit_length != another.bit_length: + return False + + trimmed_self = self.trim(inplace=False) + trimmed_another = another.trim(inplace=False) + + if self.padding != another.padding: + trimmed_another.pad(padding=self.padding, inplace=True) + + return trimmed_self.content == trimmed_another.content + + def __hash__(self) -> int: + trimmed_buffer: Buffer = self.trim(inplace=False) + return trimmed_buffer.content.__hash__() + + def __add__(self, another): + self_copy: Buffer = self.copy() + another_copy: Buffer = another.copy() + self_copy_offset = 0 + another_copy_offset = 0 + # remove excess padding + if self_copy.padding == Padding.RIGHT: + self_copy.trim(inplace=True) + self_copy_offset = self_copy.bit_length % 8 + if another_copy.padding == Padding.LEFT: + another_copy.trim(inplace=True) + another_copy_offset = (- another_copy.bit_length)%8 + + if another_copy_offset != self_copy_offset: + # need realignment of another_copy + shift:int = self_copy_offset - another_copy_offset + if shift > 0: + if another_copy.padding == Padding.LEFT: + another_copy.content += b'\x00' + another_copy.bit_length += 8 + elif another_copy.padding == Padding.RIGHT: + if shift <= another_copy.padding_length: + another_copy.bit_length += shift + else: + another_copy.bit_length += shift + another_copy.content += b'\x00' + + elif shift < 0: + if another_copy.padding == Padding.RIGHT or shift < - another_copy.padding_length: + another_copy.content = b'\x00' + another_copy.content + another_copy.bit_length += 8 + another_copy.shift(shift=shift, inplace=True) + + if self_copy_offset == 0: + self_copy.content += another_copy.content + else: + self_copy.content = self_copy.content[0:-1] + (self_copy.content[-1] + another_copy.content[0]).to_bytes(1, 'big') + another_copy.content[1:] + self_copy.bit_length += another.bit_length + self_copy._update_padding() + return self_copy + + def __getitem__(self, items): + + assert isinstance(items, (slice, int)) + + if isinstance(items, slice): + start_bit, stop_bit, _ = items.indices(self.bit_length) + + else: + start_bit = items + stop_bit = start_bit + 1 + + + subset_bit_length: int = stop_bit - start_bit + + if self.padding == Padding.LEFT: + # retrieve bits 1 to 10 + # + # 0x00 0x01 0x0d + # |- - - - - - - -| - - - 0 0 0 0 1|0 0 0 0 1 1 0 1| (-) padding (11 bits padding on the left) + # + + + + + + + + + (+) bits to output + # --> |- - - - - - 0 0|0 0 1 0 0 0 0 1| + # 0x00 0x41 + + + # |- - - - - - - -|- - - 0 0 0 0 1|0 0 0 0 1 1 0 1| + # ^ ^ + # start_bit stop_bit + # ^---------------^---------------^ + # start_byte stop_byte + start_bit += self.padding_length + stop_bit += self.padding_length + start_byte = start_bit//8 + stop_byte = stop_bit//8 if stop_bit%8 == 0 else 1 + stop_bit//8 + + # |- - - 0 0 0 0 1|0 0 0 0 0 1 0 1| subset_content + # + + + + + + + + + (+) bits to output + content_of_interest = self.content[start_byte:stop_byte] + subset_content = b'' + + # remove rightmost unneeded bits by right-shifting + # |- - - 0 0 0 0 1|0 0 0 0 1 1 0 1| subset_content + # + + + + + + + + + (+) bits to output + # |- - - - - - 0 0|0 0 1 0 0 0 0 1|x x x| + # ^ ^ + # start_bit stop_bit + # ^---------------^---------------^ + # start_byte stop_byte + shift: int = stop_byte * 8 - stop_bit + if shift > 0: + content_of_interest = b'\x00' + content_of_interest + shift_c: int = (-shift)%8 + bitmask: int = 0xff >> (-shift)%8 + for i in range(len(content_of_interest)-1, 0, -1): + byte_i = content_of_interest[i] + byte_i_1 = content_of_interest[i-1] + subset_content = (((byte_i_1 & bitmask) << shift_c) | (byte_i >> shift)).to_bytes(1, 'big') + subset_content + else: + subset_content = content_of_interest + + # remove leftmost unneeded bits using padding + # |- - - - - - - 0|0 1 0 0 0 0 0 1| + # ^ ^ + # start_bit stop_bit + # ^---------------^---------------^ + # start_byte stop_byte + subset: Buffer = Buffer(content=subset_content, bit_length=subset_bit_length, padding=self.padding) + + return subset + + else: + # retrieve bits 1 to 10 + # + # 0x08 0x68 0x00 + # |0 0 0 0 1 0 0 0|0 1 1 0 1 - - -|- - - - - - - -| (-) padding (11 bits padding on the right) + # + + + + + + + + + (+) bits to output + # --> |0 0 0 1 0 0 0 0|1 - - - - - - -| + # 0x10 0x80 + + # 0x08 0x68 0x00 + # |0 0 0 0 1 0 0 0|0 1 1 0 1 - - -|- - - - - - - -| (-) padding (11 bits padding on the right) + # + + + + + + + + + (+) bits to output + # ^ ^ + # start_bit stop_bit + # ^---------------^---------------^ + # start_byte stop_byte + start_byte = start_bit//8 + stop_byte = stop_bit//8 if stop_bit%8 == 0 else 1 + stop_bit//8 + + # 0x08 0x68 + # |0 0 0 0 1 0 0 0|0 1 1 0 1 - - -| content of interest + # + + + + + + + + + + # ^ ^ + # start_bit stop_bit + # ^---------------^---------------^ + content_of_interest = self.content[start_byte:stop_byte] + + # remove leftmost unneeded bits by shifting left + # 0x08 0x68 + # |0 0 0 0 1 0 0 0|0 1 1 0 1 - - -| content of interest + # + + + + + + + + + + # ^ ^ + # x|0 0 0 1 0 0 0 0|1 1 0 1 - - - -| content of interest + subset_content = b'' + + shift = (start_bit)%8 + shift_c: int = (-shift)%8 + if shift > 0: + shift_c: int = (-shift)%8 + content_of_interest += b'\x00' + for i in range(0, len(content_of_interest)-1): + byte_i = content_of_interest[i] + byte_i_p_1 = content_of_interest[i+1] + subset_content += (((byte_i << shift)& 0xff) | (byte_i_p_1 >> shift_c)).to_bytes(1, 'big') + else: + subset_content = content_of_interest + # and rightmost unneeded bits by padding + subset: Buffer = Buffer(content=subset_content, bit_length=subset_bit_length, padding=self.padding) + subset.trim(inplace=True) + return subset + + + def __repr__(self) -> str: + content_repr:str = "" + padding_length: int = self.padding_length + index:int = 0 + + if self.padding == Padding.LEFT and padding_length > 0: + padding_str: str = "" + if padding_length // 8 > 0: + index += padding_length//8 + padding_str += "-------- " * (padding_length//8) + padding_length %= 8 + padding_str += "-" * padding_length + format = f"0{8-padding_length}b" + content_repr += f"{padding_str}{self.content[index]:{format}} " + index += 1 + if self.bit_length < 65: + content_repr += " ".join([f"{b:08b}" for b in self.content[index:index + (self.bit_length//8)]]) + + else: + content_repr += " ".join([f"{b:08b}" for b in self.content[index:index + 4]]) + content_repr += " ... " + content_repr += " ".join([f"{b:08b}" for b in self.content[(self.bit_length//8) -4:self.bit_length//8]]) + + index += (self.bit_length//8) + if self.padding == Padding.RIGHT and padding_length > 0: + padding_str: str = " " + if padding_length // 8 > 0: + padding_str += "-------- " * (padding_length//8) + padding_length %= 8 + padding_str = "-" * padding_length + padding_str[:-1] + format = f"0{8-padding_length}b" + last_byte = self.content[index] >> padding_length + content_repr += f" {last_byte:{format}}{padding_str}" + + return f"[{content_repr}] | len: {self.bit_length} | pad: {self.padding_length} {self.padding}" diff --git a/microschc/compressor/compressor.py b/microschc/compressor/compressor.py index 4c50f4a..7e836c1 100644 --- a/microschc/compressor/compressor.py +++ b/microschc/compressor/compressor.py @@ -6,37 +6,35 @@ from typing import List, Tuple from microschc.actions.compression import least_significant_bits, mapping_sent, value_sent -from microschc.rfc8724 import FieldDescriptor, FieldResidue, MatchMapping, PacketDescriptor, Pattern, RuleDescriptor +from microschc.binary.buffer import Buffer, Padding +from microschc.rfc8724 import FieldDescriptor, MatchMapping, PacketDescriptor, RuleDescriptor from microschc.rfc8724 import CompressionDecompressionAction as CDA -def compress(packet_descriptor: PacketDescriptor, rule_descriptor: RuleDescriptor) -> Tuple[bytes, int]: +def compress(packet_descriptor: PacketDescriptor, rule_descriptor: RuleDescriptor) -> Buffer: """ Compress the packet fields following the rule's compression actions. See section 7.2 of [1]. """ - schc_packet: bytes = b'' - schc_packet_length: int = 0 + schc_packet: Buffer = Buffer(content=b'', bit_length=0, padding=Padding.RIGHT) - rule_id: bytes = rule_descriptor.id - rule_id_length: int = rule_descriptor.id_length + rule_id: Buffer = rule_descriptor.id - schc_packet = _compact_left(buffer=schc_packet, buffer_length=schc_packet_length, bytefield=rule_id, bytefield_length=rule_id_length) - schc_packet_length += rule_id_length + schc_packet += rule_id packet_fields: List[FieldDescriptor] = [] for header_descriptor in packet_descriptor.headers: - header_fields: List[FieldDescriptor] = [FieldDescriptor(id=f.id, length=f.length, position=f.position, value=f.value ) for f in header_descriptor.fields] + header_fields: List[FieldDescriptor] = [FieldDescriptor(id=f.id, position=f.position, value=f.value ) for f in header_descriptor.fields] packet_fields += header_fields for pf, rf in zip(packet_fields, rule_descriptor.field_descriptors): - field_residue: FieldResidue + field_residue: Buffer if rf.compression_decompression_action == CDA.NOT_SENT: continue elif rf.compression_decompression_action == CDA.LSB: - assert isinstance(rf.target_value, Pattern) - field_residue = least_significant_bits(field_descriptor=pf, match_pattern_length=rf.target_value.length) + assert isinstance(rf.target_value, Buffer) + field_residue = least_significant_bits(field_descriptor=pf, bit_length=pf.value.bit_length - rf.target_value.bit_length) elif rf.compression_decompression_action == CDA.MAPPING_SENT: assert isinstance(rf.target_value, MatchMapping) field_residue = mapping_sent(field_descriptor=pf, mapping=rf.target_value) @@ -44,16 +42,14 @@ def compress(packet_descriptor: PacketDescriptor, rule_descriptor: RuleDescripto field_residue = value_sent(field_descriptor=pf) if rf.compression_decompression_action in {CDA.LSB, CDA.VALUE_SENT} and rf.length == 0: - encoded_length, encoded_length_length = _encode_length(field_residue.length) - schc_packet = _compact_left(buffer=schc_packet, buffer_length=schc_packet_length, bytefield=encoded_length, bytefield_length=encoded_length_length) - schc_packet_length += encoded_length_length + encoded_length: Buffer = _encode_length(field_residue.bit_length) + schc_packet += encoded_length - schc_packet = _compact_left(buffer=schc_packet, buffer_length=schc_packet_length, bytefield=field_residue.residue, bytefield_length=field_residue.length) - schc_packet_length += field_residue.length + schc_packet += field_residue - return schc_packet, schc_packet_length + return schc_packet -def _encode_length(length:int) -> Tuple[bytes, int]: +def _encode_length(length:int) -> Buffer: ''' Encode the length value following instructions in section 7.4.2 of [1]. ''' @@ -69,51 +65,4 @@ def _encode_length(length:int) -> Tuple[bytes, int]: else: encoded_length_value = b'\x0f\xff' + length.to_bytes(2, 'big') encoded_length_length = 28 - return (encoded_length_value, encoded_length_length) - -def _compact_left(buffer: bytes, buffer_length:int, bytefield: bytes, bytefield_length: int) -> bytes: - ''' - concatenate buffer and bytefield after removing leading zero-padding - ''' - buffer_offset: int = buffer_length % 8 - bytefield_offset: int = - bytefield_length % 8 - - last_byte: int = buffer[-1] if len(buffer) > 0 else 0 - - bytefield_aligned: bytes = b'' - - # need realignment of bytefield - if bytefield_offset != buffer_offset: - # need realignment of bytefield - if buffer_offset > bytefield_offset: - # bytefield is shifted right - shift: int = buffer_offset - bytefield_offset - shift_complement: int = 8 - shift - bytefield_aligned = ((bytefield[-1] << shift_complement) & 0xff).to_bytes(1, 'big') - - # note: bytefields provided by the parser are zero-padded on the left (right-packed) - bitmask: int = 0xff >> shift_complement - - for i in range(len(bytefield)-1, 0, -1): - right_part:int = bytefield[i] >> shift - left_part:int = (bytefield[i-1] << shift_complement)& 0xff - bytefield_aligned = (left_part + right_part).to_bytes(1, 'big') + bytefield_aligned - bytefield_aligned = (bytefield[0] >> shift).to_bytes(1, 'big') + bytefield_aligned - else: - # bytefield is shifted left - shift: int = bytefield_offset - buffer_offset - shift_complement: int = 8 - shift - #bitmask: int = (0xff << shift_complement) - for i in range(len(bytefield)-1): - bytefield_aligned += (((bytefield[i] << shift) & 0xff) + (bytefield[i+1] >> shift_complement)).to_bytes(1, 'big') - bytefield_aligned += ((bytefield[-1] << shift) & 0xff).to_bytes(1, 'big') - else: - # no need for realignment - bytefield_aligned = bytefield - - if buffer_offset == 0: - buffer += bytefield_aligned - else: - buffer = buffer[0:-1] + (last_byte + bytefield_aligned[0]).to_bytes(1, 'big') + bytefield_aligned[1:] - - return buffer \ No newline at end of file + return Buffer(content=encoded_length_value, bit_length=encoded_length_length) \ No newline at end of file diff --git a/microschc/decompressor/__init__.py b/microschc/decompressor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/microschc/decompressor/decompressor.py b/microschc/decompressor/decompressor.py new file mode 100644 index 0000000..9f4c8ba --- /dev/null +++ b/microschc/decompressor/decompressor.py @@ -0,0 +1,101 @@ +''' +Implementation SCHC packet decompression as described in section 7.2 of [1]. + +[1] "SCHC: Generic Framework for Static Context Header Compression and Fragmentation" , A. Minaburo et al. +''' + +from microschc.binary.buffer import Buffer, Padding +from microschc.rfc8724 import DirectionIndicator, MatchMapping, RuleDescriptor +from microschc.rfc8724 import CompressionDecompressionAction as CDA + + + +def decompress(schc_packet: Buffer, direction: DirectionIndicator, rule: RuleDescriptor) -> Buffer: + """ + Decompress the packet fields following the rule's compression actions. + See section 7.2 of [1]. + """ + decompressed: Buffer = Buffer(content=b'', bit_length=0,padding=Padding.RIGHT) + + # remove rule ID + schc_packet = schc_packet[rule.id.bit_length:] + + # decompress all fields + field_residue: Buffer + residue_bitlength: int + decompressed_field: Buffer + for rf in rule.field_descriptors: + residue_bitlength = 0 + decompressed_field = Buffer(content=b'', bit_length=0, padding=Padding.RIGHT) + if rf.compression_decompression_action == CDA.NOT_SENT: + decompressed_field += rf.target_value + elif rf.compression_decompression_action == CDA.LSB: + assert isinstance(rf.target_value, Buffer) + lsb_bitlength: int = rf.length-rf.target_value.bit_length + field_residue = schc_packet[:lsb_bitlength] + decompressed_field += rf.target_value + decompressed_field += field_residue + residue_bitlength = lsb_bitlength + elif rf.compression_decompression_action == CDA.MAPPING_SENT: + assert isinstance(rf.target_value, MatchMapping) + for key, value in rf.target_value.reverse.items(): + if key == schc_packet[0:key.bit_length]: + field_residue = key + decompressed_field += value + residue_bitlength = key.bit_length + break + elif rf.compression_decompression_action == CDA.VALUE_SENT: + assert isinstance(rf.target_value, Buffer) + if rf.length != 0: + field_residue = schc_packet[0:rf.length] + decompressed_field += field_residue + residue_bitlength = rf.length + else: + # variable field encoded length + length_buffer: Buffer = schc_packet[0:4] + length_buffer.pad(padding=Padding.LEFT, inplace=True) + encoded_length_value: int = int.from_bytes(length_buffer.content, 'big') + if encoded_length_value < 15: + decompressed_field += schc_packet[4:4+encoded_length_value] + residue_bitlength = 4 + encoded_length_value + else: + length_buffer = schc_packet[4:12] + length_buffer.pad(padding=Padding.LEFT, inplace=True) + encoded_length_value: int = int.from_bytes(length_buffer.content, 'big') + if encoded_length_value < 255: + decompressed_field += schc_packet[12:12+encoded_length_value] + residue_bitlength = 12 + encoded_length_value + else: + length_buffer = schc_packet[12:28] + length_buffer.pad(padding=Padding.LEFT, inplace=True) + encoded_length_value: int = int.from_bytes(length_buffer.content, 'big') + decompressed_field += schc_packet[28:28+encoded_length_value] + residue_bitlength = 28 + encoded_length_value + + decompressed += decompressed_field + + schc_packet = schc_packet[residue_bitlength:] + + # concatenate the rest of the SCHC payload + decompressed += schc_packet + + + return decompressed + + + + + + # encoded_length_value: bytes + # encoded_length_length: int + # assert length < 2**16 + # if length < 15: + # encoded_length_value = length.to_bytes(1, 'big') + # encoded_length_length = 4 + # elif length < 255: + # encoded_length_value = b'\x0f' + length.to_bytes(1, 'big') + # encoded_length_length = 12 + # else: + # encoded_length_value = b'\x0f\xff' + length.to_bytes(2, 'big') + # encoded_length_length = 28 + # return Buffer(content=encoded_length_value, bit_length=encoded_length_length) \ No newline at end of file diff --git a/microschc/matching/operators.py b/microschc/matching/operators.py index 53b078b..d910160 100644 --- a/microschc/matching/operators.py +++ b/microschc/matching/operators.py @@ -11,10 +11,11 @@ [1] "RFC 8724 SCHC: Generic Framework for Static Context Header Compression and Fragmentation" , A. Minaburo et al. """ +from microschc.binary.buffer import Buffer from microschc.rfc8724 import FieldDescriptor, Mapping, MatchMapping -def equal(field_descriptor: FieldDescriptor, target_value: bytes) -> bool: +def equal(field_descriptor: FieldDescriptor, target_value: Buffer) -> bool: """ `equal` matching operator: the match result is True if the field value in the packet matches the target value @@ -28,7 +29,7 @@ def ignore(field_descriptor: FieldDescriptor) -> bool: """ return True -def most_significant_bits(field_descriptor: FieldDescriptor, pattern: bytes, pattern_length: int) -> bool: +def most_significant_bits(field_descriptor: FieldDescriptor, pattern: Buffer) -> bool: """ `MSB(x)` matching operator: the match result is True if the `pattern_length` most significant (leftmost) bits of the field value equal @@ -40,60 +41,10 @@ def most_significant_bits(field_descriptor: FieldDescriptor, pattern: bytes, pat are on the first byte of the representation. Colloquially speaking, the raw field is packed on the right when parsed. - we also assume that the pattern is provided as bytes and that it is left-padded, if necessary. - - Suppose the following field value: b'\x33\xff\x23\xdb\xda' of length 38 bits, pattern: b'\x01\x9f\xf9' of length 17 bits - field value - |---------------------------------------------------------------------------| - 0x33 0xff 0x23 0xdb 0xda - 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 0 0 1 0 0 0 1 1 1 1 0 1 1 0 1 1 1 1 0 1 1 0 1 0 - | byte0 | byte1 | byte2 | byte3 | byte4 | - |--------------pattern------------|-----------------residue-----------------| - | [...] 1| byte1 | byte2 | - 0x01 0x9f 0xf9 - - The expected residue is: - 0x3 0xdb 0xda - 0 0 0 0 0 0 1 1 1 1 0 1 1 0 1 1 1 1 0 1 1 0 1 0 - | byte2 | byte3 | byte4 | - | 0s | | + we also assume that the pattern is provided as bytes and that it is left-padded, if necessary. """ - - field_value: bytes = field_descriptor.value - field_length: int = field_descriptor.length - - residue_length: int = field_length - pattern_length - residue_fullbytes: int = residue_length // 8 - - most_significant_bits: bytes = field_value[:-residue_fullbytes] # bytes 0-2 - residue_alignment: int = residue_length % 8 - if residue_alignment > 0: - left_shift: int = 8 - residue_alignment - region_of_interest: bytes = most_significant_bits - region_of_interest_length = len(region_of_interest) - # region of interest - # |-------------------------------------| - # 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 0 0 1 0 0 0 1 1 - # | byte0 | byte1 | byte2 | - - # pattern - # 0 0 0 0 0 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 0 0 1 - #| byte0 | byte1 | byte2 | - # 0x01 0x9f 0xf9 - - - most_significant_bits: bytes = b'' - bitmask: int = 0xff >> left_shift - for i in range(region_of_interest_length-1, 0,-1): - right_part: int = region_of_interest[i] >> residue_alignment - left_part: int = (region_of_interest[i-1] & bitmask) << left_shift - byte_realigned = left_part + right_part - most_significant_bits = byte_realigned.to_bytes(1, 'big') + most_significant_bits - - # leading bits of region of interest - leading_bits = region_of_interest[0] >> residue_alignment - most_significant_bits = leading_bits.to_bytes(1, 'big') + most_significant_bits - + field_value: Buffer = field_descriptor.value + most_significant_bits = field_value.shift(shift=(field_value.bit_length-pattern.bit_length), inplace=False) return most_significant_bits == pattern def match_mapping(field_descriptor: FieldDescriptor, target_values: MatchMapping) -> bool: diff --git a/microschc/parser/parser.py b/microschc/parser/parser.py index bef66f5..c169614 100644 --- a/microschc/parser/parser.py +++ b/microschc/parser/parser.py @@ -1,7 +1,6 @@ from typing import List from microschc.rfc8724 import DirectionIndicator, FieldDescriptor, HeaderDescriptor, PacketDescriptor -from microschc.rfc8724extras import ParserDefinitions - +from microschc.binary.buffer import Buffer class HeaderParser: """Abstract Base Class for header parsers. @@ -44,7 +43,7 @@ def parse(self, buffer: bytes, direction: DirectionIndicator) -> PacketDescripto pass buffer = buffer[bytes_consumed:] - packet_descriptor: PacketDescriptor = PacketDescriptor(direction=direction, headers=header_descriptors, payload=buffer) + packet_descriptor: PacketDescriptor = PacketDescriptor(direction=direction, headers=header_descriptors, payload=Buffer(content=buffer, bit_length=8*len(buffer))) return packet_descriptor diff --git a/microschc/parser/protocol/coap.py b/microschc/parser/protocol/coap.py index 5b99fd8..b5253ca 100644 --- a/microschc/parser/protocol/coap.py +++ b/microschc/parser/protocol/coap.py @@ -23,6 +23,7 @@ from enum import Enum from typing import List, Tuple +from microschc.binary.buffer import Buffer from microschc.parser import HeaderParser from microschc.rfc8724 import FieldDescriptor, HeaderDescriptor @@ -106,12 +107,12 @@ def parse(self, buffer: bytes) -> HeaderDescriptor: token: bytes = header_bytes[4: 4+token_length_int] header_fields: List[FieldDescriptor] = [ - FieldDescriptor(id=CoAPFields.VERSION, length=2, position=0, value=version), - FieldDescriptor(id=CoAPFields.TYPE, length=2, position=0, value=type), - FieldDescriptor(id=CoAPFields.TOKEN_LENGTH, length=4, position=0, value=token_length), - FieldDescriptor(id=CoAPFields.CODE, length=8, position=0, value=code), - FieldDescriptor(id=CoAPFields.MESSAGE_ID, length=16, position=0, value=message_id), - FieldDescriptor(id=CoAPFields.TOKEN, length=token_length_int*8, position=0, value=token), + FieldDescriptor(id=CoAPFields.VERSION, position=0, value=Buffer(content=version, bit_length=2)), + FieldDescriptor(id=CoAPFields.TYPE, position=0, value=Buffer(content=type, bit_length=2)), + FieldDescriptor(id=CoAPFields.TOKEN_LENGTH, position=0, value=Buffer(content=token_length, bit_length=4)), + FieldDescriptor(id=CoAPFields.CODE, position=0, value=Buffer(content=code, bit_length=8)), + FieldDescriptor(id=CoAPFields.MESSAGE_ID, position=0, value=Buffer(content=message_id, bit_length=16)), + FieldDescriptor(id=CoAPFields.TOKEN, position=0, value=Buffer(content=token, bit_length=token_length_int*8)), ] options_bytes: bytes = buffer[4+token_length_int:] @@ -160,12 +161,12 @@ def _parse_options(buffer: bytes) -> Tuple[List[FieldDescriptor], int]: # option_delta: 4 bits option_delta: bytes = ((option_bytes[0] & 0xf0) >> 4).to_bytes(1, 'big') - fields.append(FieldDescriptor(id=CoAPFields.OPTION_DELTA, length=4, position=option_index, value=option_delta)) + fields.append(FieldDescriptor(id=CoAPFields.OPTION_DELTA, position=option_index, value=Buffer(content=option_delta, bit_length=4))) # option_length: 4 bits option_length_int: int = option_bytes[0] & 0x0f option_length: bytes = option_length_int.to_bytes(1, 'big') - fields.append(FieldDescriptor(id=CoAPFields.OPTION_LENGTH, length=4, position=option_index, value=option_length)) + fields.append(FieldDescriptor(id=CoAPFields.OPTION_LENGTH, position=option_index, value=Buffer(content=option_length, bit_length=4))) # option_length_extended: option_length_extended_int: int = 0 @@ -174,40 +175,40 @@ def _parse_options(buffer: bytes) -> Tuple[List[FieldDescriptor], int]: if option_delta == CoAPDefinitions.OPTION_DELTA_EXTENDED_8BITS: # option_delta_extended: 8 bits - option_delta_extended: int = option_bytes[1] - fields.append(FieldDescriptor(id=CoAPFields.OPTION_DELTA_EXTENDED, length=8, position=option_index, value=option_delta_extended)) + option_delta_extended: bytes = option_bytes[1:2] + fields.append(FieldDescriptor(id=CoAPFields.OPTION_DELTA_EXTENDED, position=option_index, value=Buffer(content=option_delta_extended, bit_length=8))) option_offset = 2 elif option_delta == CoAPDefinitions.OPTION_DELTA_EXTENDED_16BITS: # option_delta_extended: 16 bits - option_delta_extended: int = (option_bytes[1] << 8) & option_bytes[2] - fields.append(FieldDescriptor(id=CoAPFields.OPTION_DELTA_EXTENDED, length=16, position=option_index, value=option_delta_extended)) + option_delta_extended: bytes = option_bytes[1:3] + fields.append(FieldDescriptor(id=CoAPFields.OPTION_DELTA_EXTENDED, position=option_index, value=Buffer(content=option_delta_extended, bit_length=16))) option_offset = 3 if option_length == CoAPDefinitions.OPTION_LENGTH_EXTENDED_8BITS: # option_length_extended: 8 bits option_length_extended_int: int = option_bytes[option_offset] option_length_extended: bytes = option_length_extended_int.to_bytes(1, 'big') - fields.append(FieldDescriptor(id=CoAPFields.OPTION_LENGTH_EXTENDED, length=8, position=option_index, value=option_length_extended)) + fields.append(FieldDescriptor(id=CoAPFields.OPTION_LENGTH_EXTENDED, position=option_index, value=Buffer(content=option_length_extended, bit_length=8))) option_offset += 1 elif option_length == CoAPDefinitions.OPTION_LENGTH_EXTENDED_16BITS: # option_length_extended: 16 bits option_length_extended_int: int = (option_bytes[option_offset] << 8) & option_bytes[option_offset+1] - option_length_extended: bytes = option_length_extended_int.to_bytes(1, 'big') - fields.append(FieldDescriptor(id=CoAPFields.OPTION_LENGTH_EXTENDED, length=16, position=option_index, value=option_length_extended)) + option_length_extended: bytes = option_bytes[option_offset:option_offset+2] + fields.append(FieldDescriptor(id=CoAPFields.OPTION_LENGTH_EXTENDED, position=option_index, value=Buffer(content=option_length_extended, bit_length=16))) option_offset += 2 option_value_length = option_length_int + option_length_extended_int option_value: bytes = option_bytes[option_offset: option_offset+option_value_length] - fields.append(FieldDescriptor(id=CoAPFields.OPTION_VALUE, length=option_value_length*8, position=option_index, value=option_value)) + fields.append(FieldDescriptor(id=CoAPFields.OPTION_VALUE, position=option_index, value=Buffer(content=option_value, bit_length=option_value_length*8))) option_offset += option_value_length cursor += option_offset # append payload marker field cursor += 1 - fields.append(FieldDescriptor(id=CoAPFields.PAYLOAD_MARKER, length=8, position=0, value=b'\xff')) + fields.append(FieldDescriptor(id=CoAPFields.PAYLOAD_MARKER, position=0, value=Buffer(content=b'\xff', bit_length=8))) # return CoAP fields descriptors list return (fields, 8*cursor) \ No newline at end of file diff --git a/microschc/parser/protocol/ipv6.py b/microschc/parser/protocol/ipv6.py index 6344efd..a5a5cc8 100644 --- a/microschc/parser/protocol/ipv6.py +++ b/microschc/parser/protocol/ipv6.py @@ -12,6 +12,7 @@ """ from enum import Enum +from microschc.binary.buffer import Buffer from microschc.parser import HeaderParser from microschc.rfc8724 import FieldDescriptor, HeaderDescriptor @@ -66,11 +67,11 @@ def parse(self, buffer:bytes) -> HeaderDescriptor: # flow label: 20 bits flow_label:bytes = ((header_bytes[1] & 0xf0) >> 4).to_bytes(1, 'big') + header_bytes[2:4] # payload length: 16 bits - payload_length:bytes = ((header_bytes[4] << 8) | (header_bytes[5])).to_bytes(2, 'big') + payload_length:bytes = header_bytes[4:6] # next header: 8 bits - next_header:bytes = (header_bytes[6]).to_bytes(1, 'big') + next_header:bytes = header_bytes[6:7] # hop limit: 8 bits - hop_limit:bytes = ((header_bytes[7])).to_bytes(1, 'big') + hop_limit:bytes = header_bytes[7:8] # source address: 128 bits (16 bytes) source_address:bytes = header_bytes[8:24] # destination address: 128 bits (16 bytes) @@ -80,14 +81,14 @@ def parse(self, buffer:bytes) -> HeaderDescriptor: id=IPv6_HEADER_ID, length=40*8, fields=[ - FieldDescriptor(id=IPv6Fields.VERSION, length=4, position=0, value=version), - FieldDescriptor(id=IPv6Fields.TRAFFIC_CLASS, length=8, position=0, value=traffic_class), - FieldDescriptor(id=IPv6Fields.FLOW_LABEL, length=20, position=0, value=flow_label), - FieldDescriptor(id=IPv6Fields.PAYLOAD_LENGTH, length=16, position=0, value=payload_length), - FieldDescriptor(id=IPv6Fields.NEXT_HEADER, length=8, position=0, value=next_header), - FieldDescriptor(id=IPv6Fields.HOP_LIMIT, length=8, position=0, value=hop_limit), - FieldDescriptor(id=IPv6Fields.SRC_ADDRESS, length=128, position=0, value=source_address), - FieldDescriptor(id=IPv6Fields.DST_ADDRESS, length=128, position=0, value=destination_address) + FieldDescriptor(id=IPv6Fields.VERSION, position=0, value=Buffer(content=version, bit_length=4)), + FieldDescriptor(id=IPv6Fields.TRAFFIC_CLASS, position=0, value=Buffer(content=traffic_class, bit_length=8)), + FieldDescriptor(id=IPv6Fields.FLOW_LABEL, position=0, value=Buffer(content=flow_label, bit_length=20)), + FieldDescriptor(id=IPv6Fields.PAYLOAD_LENGTH, position=0, value=Buffer(content=payload_length, bit_length=16)), + FieldDescriptor(id=IPv6Fields.NEXT_HEADER, position=0, value=Buffer(content=next_header, bit_length=8)), + FieldDescriptor(id=IPv6Fields.HOP_LIMIT, position=0, value=Buffer(content=hop_limit, bit_length=8)), + FieldDescriptor(id=IPv6Fields.SRC_ADDRESS, position=0, value=Buffer(content=source_address, bit_length=128)), + FieldDescriptor(id=IPv6Fields.DST_ADDRESS, position=0, value=Buffer(content=destination_address, bit_length=128)) ] ) return header_descriptor diff --git a/microschc/parser/protocol/udp.py b/microschc/parser/protocol/udp.py index 349f14f..85f0f9d 100644 --- a/microschc/parser/protocol/udp.py +++ b/microschc/parser/protocol/udp.py @@ -10,6 +10,7 @@ from enum import Enum from microschc.parser import HeaderParser from microschc.rfc8724 import FieldDescriptor, HeaderDescriptor +from microschc.binary.buffer import Buffer UDP_HEADER_ID = 'UDP' @@ -57,10 +58,10 @@ def parse(self, buffer:bytes) -> HeaderDescriptor: id=UDP_HEADER_ID, length=8*8, fields=[ - FieldDescriptor(id=UDPFields.SOURCE_PORT, length=16, position=0, value=source_port), - FieldDescriptor(id=UDPFields.DESTINATION_PORT, length=16, position=0, value=destination_port), - FieldDescriptor(id=UDPFields.LENGTH, length=16, position=0, value=length), - FieldDescriptor(id=UDPFields.CHECKSUM, length=16, position=0, value=checksum), + FieldDescriptor(id=UDPFields.SOURCE_PORT, position=0, value=Buffer(content=source_port, bit_length=16)), + FieldDescriptor(id=UDPFields.DESTINATION_PORT, position=0, value=Buffer(content=destination_port, bit_length=16)), + FieldDescriptor(id=UDPFields.LENGTH, position=0, value=Buffer(content=length, bit_length=16)), + FieldDescriptor(id=UDPFields.CHECKSUM, position=0, value=Buffer(content=checksum, bit_length=16)), ] ) return header_descriptor diff --git a/microschc/rfc8724.py b/microschc/rfc8724.py index 2b47b36..66bb379 100644 --- a/microschc/rfc8724.py +++ b/microschc/rfc8724.py @@ -8,23 +8,20 @@ from dataclasses import dataclass from typing import Dict, List, Union +from microschc.binary.buffer import Buffer -@dataclass -class Pattern: - pattern: bytes - length: int -ReverseMapping = Dict[int, bytes] -Mapping = Dict[bytes, int] + +ReverseMapping = Dict[Buffer, Buffer] +Mapping = Dict[Buffer, Buffer] class MatchMapping: - def __init__(self, index_length: int, forward_mapping: Mapping): - self.index_length: int = index_length + def __init__(self, forward_mapping: Mapping): self.forward: Mapping = forward_mapping self.reverse: ReverseMapping = {v: k for k, v in self.forward.items()} -TargetValue = Union[bytes, Pattern, MatchMapping] +TargetValue = Union[Buffer, MatchMapping] class DirectionIndicator(str, Enum): UP = 'Up' @@ -48,9 +45,9 @@ class CompressionDecompressionAction(str, Enum): @dataclass class FieldDescriptor: id: str - length: int + value: Buffer position: int - value: bytes + @dataclass @@ -64,14 +61,9 @@ class HeaderDescriptor: class PacketDescriptor: direction: DirectionIndicator headers: List[HeaderDescriptor] - payload: bytes + payload: Buffer -@dataclass -class FieldResidue: - residue: bytes - length: int - @dataclass class RuleFieldDescriptor: id: str @@ -84,6 +76,5 @@ class RuleFieldDescriptor: @dataclass class RuleDescriptor: - id: bytes - id_length: int + id: Buffer field_descriptors: List[RuleFieldDescriptor] diff --git a/microschc/ruler/ruler.py b/microschc/ruler/ruler.py index 3562710..0e627f2 100644 --- a/microschc/ruler/ruler.py +++ b/microschc/ruler/ruler.py @@ -15,9 +15,10 @@ """ from typing import List +from microschc.binary.buffer import Buffer from microschc.matching.operators import equal, ignore, match_mapping, most_significant_bits -from microschc.rfc8724 import DirectionIndicator, FieldDescriptor, MatchMapping, Pattern, MatchingOperator, PacketDescriptor, RuleDescriptor, RuleFieldDescriptor, TargetValue +from microschc.rfc8724 import DirectionIndicator, FieldDescriptor, MatchMapping, MatchingOperator, PacketDescriptor, RuleDescriptor, RuleFieldDescriptor, TargetValue class Ruler: @@ -25,14 +26,14 @@ class Ruler: def __init__(self, rules_descriptors: List[RuleDescriptor]) -> None: self.rules: List[RuleDescriptor] = rules_descriptors - def match(self, packet_descriptor: PacketDescriptor) -> RuleDescriptor: + def match_packet_descriptor(self, packet_descriptor: PacketDescriptor) -> RuleDescriptor: """ Find a rule matching the packet descriptor """ packet_fields: List[FieldDescriptor] = [] for header_descriptor in packet_descriptor.headers: - header_fields: List[FieldDescriptor] = [FieldDescriptor(id=f.id, length=f.length, position=f.position, value=f.value ) for f in header_descriptor.fields] + header_fields: List[FieldDescriptor] = [FieldDescriptor(id=f.id, value=f.value, position=f.position) for f in header_descriptor.fields] packet_fields += header_fields packet_direction: DirectionIndicator = packet_descriptor.direction @@ -55,6 +56,23 @@ def match(self, packet_descriptor: PacketDescriptor) -> RuleDescriptor: return rule # if no rule matches, use the default return self.rules[-1] + + def match_schc_packet(self, schc_packet: Buffer) -> RuleDescriptor: + ''' + find a rule matching the rule ID of a SCHC packet + ''' + matching_rule: RuleFieldDescriptor + # iterate though rules and try matching the rule ID with SCHC packet beginning + for rule in self.rules: + rule_id: Buffer = rule.id + if rule_id.bit_length > schc_packet.bit_length: + continue + if rule_id == schc_packet[0:rule_id.bit_length]: + return rule + + # if no rule matched, return default + return self.rules[-1] + def _field_match(packet_field: FieldDescriptor, rule_field: RuleFieldDescriptor): # basic test: field IDs and length match @@ -66,13 +84,15 @@ def _field_match(packet_field: FieldDescriptor, rule_field: RuleFieldDescriptor) return ignore(packet_field) elif rule_field.matching_operator == MatchingOperator.EQUAL: - assert isinstance(rule_field.target_value, bytes) - return packet_field.length == rule_field.length and equal(packet_field, rule_field.target_value) + assert isinstance(rule_field.target_value, Buffer) + return packet_field.value == rule_field.target_value elif rule_field.matching_operator == MatchingOperator.MSB: pattern: TargetValue = rule_field.target_value - assert isinstance(pattern, Pattern) - return most_significant_bits(packet_field, pattern=pattern.pattern, pattern_length=pattern.length) + if (rule_field.length != packet_field.value.bit_length): + return False + assert isinstance(pattern, Buffer) + return most_significant_bits(packet_field, pattern=pattern) elif rule_field.matching_operator == MatchingOperator.MATCH_MAPPING: mapping: TargetValue = rule_field.target_value diff --git a/tests/actions/test_compression.py b/tests/actions/test_compression.py index 6367119..6b13155 100644 --- a/tests/actions/test_compression.py +++ b/tests/actions/test_compression.py @@ -1,6 +1,7 @@ from math import ceil from microschc.actions.compression import not_sent, value_sent, mapping_sent, least_significant_bits -from microschc.rfc8724 import FieldDescriptor, FieldResidue, Mapping, MatchMapping +from microschc.binary.buffer import Buffer +from microschc.rfc8724 import FieldDescriptor, Mapping, MatchMapping SOME_ID = 'ID' @@ -9,15 +10,13 @@ def test_not_sent(): """test: `not-sent` compression action Test that residue is of size 0 and empty """ + + field_descriptor: FieldDescriptor = FieldDescriptor(id=SOME_ID, value=Buffer(content=b'\xff', bit_length=8), position=0) - # test on integer values - integer_target_value = 13 - integer_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, length=16, position=0, value=integer_target_value) - - field_residue: FieldResidue = not_sent(integer_field) + field_residue: Buffer = not_sent(field_descriptor) - assert field_residue.length == 0 - assert field_residue.residue == b'' + assert field_residue.bit_length == 0 + assert field_residue.content == b'' def test_value_sent(): """test: `value-sent` compression action @@ -26,10 +25,10 @@ def test_value_sent(): # test on bytes value bytes_value = b'\x13\xff' - bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, length=16, position=0, value=bytes_value) - field_residue: FieldResidue = value_sent(field_descriptor=bytes_field) - assert field_residue.residue == bytes_value - assert field_residue.length == bytes_field.length + bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, position=0, value=Buffer(content=bytes_value, bit_length=16)) + field_residue: Buffer = value_sent(field_descriptor=bytes_field) + assert field_residue.content == bytes_value + assert field_residue.bit_length == bytes_field.value.bit_length def test_mapping_sent(): """test: `mapping-sent` compression action @@ -37,15 +36,17 @@ def test_mapping_sent(): """ # test on bytes value - bytes_value = b'\x13\xff' - bytes_forward_mapping: Mapping = {b'\xff': 1, b'\x13\xff':2} - index_length: int = 3 - - bytes_match_mapping : MatchMapping = MatchMapping(index_length=index_length, forward_mapping=bytes_forward_mapping) - bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, length=16, position=0, value=bytes_value) - field_residue: FieldResidue = mapping_sent(field_descriptor=bytes_field, mapping=bytes_match_mapping) - assert int.from_bytes(field_residue.residue, 'big') == 2 - assert field_residue.length == index_length + bytes_value = Buffer(content=b'\x13\xff', bit_length=16) + bytes_forward_mapping: Mapping = { + Buffer(content=b'\xff', bit_length=8): Buffer(content=b'\x01', bit_length=3), + Buffer(content=b'\x13\xff', bit_length=16): Buffer(content=b'\x02', bit_length=3) + } + + bytes_match_mapping : MatchMapping = MatchMapping(forward_mapping=bytes_forward_mapping) + bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, position=0, value=bytes_value) + field_residue: Buffer = mapping_sent(field_descriptor=bytes_field, mapping=bytes_match_mapping) + assert field_residue.content == b'\x02' + assert field_residue.bit_length == 3 def test_least_significant_bits(): """test: `LSB` compression action @@ -69,18 +70,14 @@ def test_least_significant_bits(): """ - pattern: bytes = b'\x01\x9f\xf9' - pattern_length: int = 17 # in bits + pattern: Buffer = Buffer(content=b'\x01\x9f\xf9', bit_length=17) - field_value: bytes = b'\x33\xff\x23\xdb\xda' - field_length: int = 38 # in bits - + field_value: Buffer = Buffer(content=b'\x33\xff\x23\xdb\xda', bit_length=38) - expected_residue_length: int = field_length - pattern_length - expected_residue: bytes = b'\x03\xdb\xda' + expected_residue: Buffer = Buffer(content=b'\x03\xdb\xda', bit_length=field_value.bit_length - pattern.bit_length) - bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, length=field_length, position=0, value=field_value) + bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, position=0, value=field_value) - field_residue: FieldResidue = least_significant_bits(field_descriptor=bytes_field, match_pattern_length=pattern_length) - assert field_residue.length == expected_residue_length - assert field_residue.residue == expected_residue + field_residue: Buffer = least_significant_bits(field_descriptor=bytes_field, bit_length=field_value.bit_length - pattern.bit_length) + assert field_residue.bit_length == expected_residue.bit_length + assert field_residue.content == expected_residue.content diff --git a/tests/binary/__init__.py b/tests/binary/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/binary/test_buffer.py b/tests/binary/test_buffer.py new file mode 100644 index 0000000..6d21de2 --- /dev/null +++ b/tests/binary/test_buffer.py @@ -0,0 +1,249 @@ + +from microschc.binary.buffer import Buffer, Padding + +def test_shift(): + # left shift is larger than padding, prepend null bytes to buffer prior to shifting + # 10 bits left shift + # <|< < < < < < < <|< v | + # |7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0| (|) byte delimiter + # +|+ + + + + + + +|- 0 1 0 1 1 0 1| (-) existing padding + # |- - - - - - - 0|1 0 1 1 0 1 0 0|0 0 0 0 0 0 0 0| (+) extra bits required + buffer: Buffer = Buffer(content=bytes(b'\x2d'), bit_length=7, padding=Padding.LEFT) + shift_value = - 10 + + shifted_buffer: Buffer = buffer.shift(shift=shift_value, inplace=False) + assert len(shifted_buffer.content) == 3 + assert shifted_buffer.content == bytes(b'\x00\xb4\00') + assert shifted_buffer.bit_length == 17 + + # left shift is smaller than padding + # 10 bits left shift + # | < < v | | + # 7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0| (|) byte delimiter + # |- - - - - - - -|- - - - 1 0 0 0|0 0 1 0 1 1 0 1| (-) existing padding + # |- - - - - - - -|- - 1 0 0 0 0 0|1 0 1 1 0 1 0 0| (+) extra bits required + buffer: Buffer = Buffer(content=bytes(b'\x00\x08\x2d'), bit_length=12, padding=Padding.LEFT) + shift_value = -2 + + shifted_buffer = buffer.shift(shift=shift_value, inplace=False) + + assert len(shifted_buffer.content) == 3 + assert shifted_buffer.content == bytes(b'\x00\x20\xb4') + + # left shift, right padding + # 2 bits left shift + # | < <|v | | | + # |7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0| (|) byte delimiter + # |0 0 0 0 1 0 0 0|0 0 1 0 1 - - -|- - - - - - - -| (-) existing padding + # |0 0 0 0 1 0 0 0|0 0 1 0 1 0 0 -|- - - - - - - -| (+) extra bits required + buffer: Buffer = Buffer(content=bytes(b'\x08\x28\x00'), bit_length=13, padding=Padding.RIGHT) + shift_value: int = -2 + + shifted_buffer = buffer.shift(shift=shift_value, inplace=False) + assert len(shifted_buffer.content) == 3 + assert shifted_buffer.content == bytes(b'\x08\x28\x00') + assert shifted_buffer.bit_length == 15 + + # right shift, left padding + # 3 bits right shift + # | | v > > >| + # 7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0| (|) byte delimiter + # |- - - - - - - -|- - - - 1 0 0 0|0 0 1 0 1 1 0 1| (-) existing padding + # |- - - - - - - -|- - - - - - - 1|0 0 0 0 0 1 0 1| (+) extra bits required + buffer: Buffer = Buffer(content=bytes(b'\x00\x08\x2d'), bit_length=12, padding=Padding.LEFT) + shift_value = 3 + shifted_buffer = buffer.shift(shift=shift_value, inplace=False) + assert len(shifted_buffer.content) == 3 + assert shifted_buffer.content == bytes(b'\x00\x01\x05') + assert shifted_buffer.bit_length == 9 + + # right shift, left padding + # 6 bits right shift + # | | v > > > > > >| + # 7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0| (|) byte delimiter + # |- - - - - - - -|- - - - 1 0 0 0|0 0 1 0 1 1 0 1| (-) existing padding + # |- - - - - - - -|- - - - - - - -|- - 1 0 0 0 0 0| (+) extra bits required + buffer: Buffer = Buffer(content=bytes(b'\x00\x08\x2d'), bit_length=12, padding=Padding.LEFT) + shift_value = 6 + shifted_buffer = buffer.shift(shift=shift_value, inplace=False) + assert len(shifted_buffer.content) == 3 + assert shifted_buffer.content == bytes(b'\x00\x00\x20') + assert shifted_buffer.bit_length == 6 + +def test_trim_padding(): + # right padding trimming + # |7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0| (|) byte delimiter + # |0 0 0 0 1 0 0 0|0 0 1 0 1 - - -|- - - - - - - -| (-) existing padding + # |0 0 0 0 1 0 0 0|0 0 1 0 1 - - -| (-) expected result + buffer: Buffer = Buffer(content=bytes(b'\x08\x28\x00'), bit_length=13, padding=Padding.RIGHT) + + expected_buffer: Buffer = Buffer(content=bytes(b'\x08\x28'), bit_length=13, padding=Padding.RIGHT) + + assert buffer.trim(inplace=False) == expected_buffer + + +def test_repr(): + # 11 bits padding on the right + # |0 0 0 0 1 0 0 0|0 0 1 0 1 - - -|- - - - - - - -| (-) padding + buffer: Buffer = Buffer(content=bytes(b'\x08\x28\x00'), bit_length=13, padding=Padding.RIGHT) + repr: str = buffer.__repr__() + + assert repr == '[00001000 00101--- --------] | len: 13 | pad: 11 right' + + # 11 bits padding on the left + # |- - - - - - - -|- - - 0 1 0 0 0|0 0 1 0 1 0 0 0| (-) padding + buffer: Buffer = Buffer(content=bytes(b'\x00\x08\x28'), bit_length=13, padding=Padding.LEFT) + repr: str = buffer.__repr__() + + assert repr == '[-------- ---01000 00101000] | len: 13 | pad: 11 left' + + # 3 bits padding on the right + # |0 0 0 0 1 0 0 0|0 0 1 0 1 - - -| (-) padding + buffer: Buffer = Buffer(content=bytes(b'\x08\x28'), bit_length=13, padding=Padding.RIGHT) + repr: str = buffer.__repr__() + + assert repr == '[00001000 00101---] | len: 13 | pad: 3 right' + + # 3 bits padding on the left + # |- - - 0 1 0 0 0|0 0 1 0 1 0 0 0| (-) padding + buffer: Buffer = Buffer(content=bytes(b'\x08\x28'), bit_length=13, padding=Padding.LEFT) + repr: str = buffer.__repr__() + + assert repr == '[---01000 00101000] | len: 13 | pad: 3 left' + + # b'\x33\xff\x60' ('b'3\xff`') len: 24 padding: right + # |0 0 1 1 0 0 1 1| 1 1 1 1 1 1 1 1| 0 1 1 0 0 0 0 0| + buffer: Buffer = Buffer(content=bytes(b'\x33\xff\x60'), bit_length=24, padding=Padding.RIGHT) + repr: str = buffer.__repr__() + + assert repr == '[00110011 11111111 01100000] | len: 24 | pad: 0 right' + +def test_pad(): + # 11 bits padding on the right + # |0 0 0 0 1 0 0 0|0 0 1 0 1 - - -|- - - - - - - -| (-) padding + # |- - - - - - - -|- - - 0 0 0 0 1|0 0 0 0 0 1 0 1| (-) padding + buffer: Buffer = Buffer(content=bytes(b'\x08\x28\x00'), bit_length=13, padding=Padding.RIGHT) + + padded: Buffer = buffer.pad(padding=Padding.LEFT, inplace=False) + expected: Buffer = Buffer(content=b'\x00\x01\x05', bit_length=buffer.bit_length, padding=Padding.LEFT) + + assert padded == expected + + reverted: Buffer = padded.pad(padding=Padding.RIGHT, inplace=False) + assert reverted == buffer + + +def test_get(): + # retrieve bits 1 to 10 + # + # 0x00 0x01 0x0d + # |- - - - - - - -| - - - 0 0 0 0 1|0 0 0 0 1 1 0 1| (-) padding (11 bits padding on the left) + # + + + + + + + + + (+) bits to output + # --> |- - - - - - 0 0|0 0 1 0 0 0 0 1| + # 0x00 0x41 + buffer: Buffer = Buffer(content=bytes(b'\x00\x01\x0d'), bit_length=13, padding=Padding.LEFT) + expected: Buffer = Buffer(content=bytes(b'\x00\x21'), bit_length=9, padding=Padding.LEFT) + buffer_subset: Buffer = buffer[1:10] + assert buffer_subset == expected + assert buffer_subset.padding == Padding.LEFT + + # retrieve bits 1 to 14 --> retrieve 1 to 13 + # + # 0x00 0x01 0x0d + # |- - - - - - - -| - - - 0 0 0 0 1|0 0 0 0 1 1 0 1| (-) padding (11 bits padding on the left) + # + + + + + + + + + + + + (+) bits to output + # --> | - - - - 0 0 0 1|0 0 0 0 1 1 0 1| + # 0x01 0x0d + + buffer_subset: Buffer = buffer[1:13] + expected: Buffer = Buffer(content=b'\x01\x0d', bit_length=12, padding=Padding.LEFT) + assert buffer_subset == expected + assert buffer_subset.padding == Padding.LEFT + + # retrieve bits 0 to 13 + # + # 0x00 0x01 0x0d + # |- - - - - - - -| - - - 0 0 0 0 1|0 0 0 0 1 1 0 1| (-) padding (11 bits padding on the left) + # + + + + + + + + + + + + + (+) bits to output + # --> |- - - 0 0 0 0 1|0 0 0 0 1 1 0 1| + # 0x01 0x41 + buffer: Buffer = Buffer(content=bytes(b'\x00\x01\x0d'), bit_length=13, padding=Padding.LEFT) + expected: Buffer = Buffer(content=bytes(b'\x01\x0d'), bit_length=13, padding=Padding.LEFT) + buffer_subset: Buffer = buffer[0:13] + assert buffer_subset == expected + assert buffer_subset.padding == Padding.LEFT + + # retrieve last 4 bits (9 to 13) + # + # 0x00 0x01 0x0d + # |- - - - - - - -| - - - 0 0 0 0 1|0 0 0 0 1 1 0 1| (-) padding (11 bits padding on the left) + # + + + + (+) bits to output + # --> |- - - - 1 1 0 1| + # 0x01 + buffer: Buffer = Buffer(content=bytes(b'\x00\x01\x0d'), bit_length=13, padding=Padding.LEFT) + expected: Buffer = Buffer(content=bytes(b'\x0d'), bit_length=4, padding=Padding.LEFT) + buffer_subset: Buffer = buffer[-4:] + assert buffer_subset == expected + assert buffer_subset.padding == Padding.LEFT + + + # retrieve bits 1 to 10 + # + # 0x08 0x68 0x00 + # |0 0 0 0 1 0 0 0|0 1 1 0 1 - - -|- - - - - - - -| (-) padding (11 bits padding on the right) + # + + + + + + + + + (+) bits to output + # --> |0 0 0 1 0 0 0 0|1 - - - - - - -| + # 0x10 0x80 + buffer: Buffer = Buffer(content=bytes(b'\x08\x68'), bit_length=13, padding=Padding.RIGHT) + expected: Buffer = Buffer(content=bytes(b'\x10\x80'), bit_length=9, padding=Padding.RIGHT) + buffer_subset: Buffer = buffer[1:10] + assert buffer_subset == expected + assert buffer_subset.padding == Padding.RIGHT + + # retrieve bits 0 to 13 + # + # 0x08 0x68 0x00 + # |0 0 0 0 1 0 0 0|0 1 1 0 1 - - -|- - - - - - - -| (-) padding (11 bits padding on the right) + # + + + + + + + + + + + + (+) bits to output + # --> |0 0 0 1 0 0 0 0|0 1 1 0 1 - - -| + # 0x08 0x68 + buffer: Buffer = Buffer(content=bytes(b'\x08\x68'), bit_length=13, padding=Padding.RIGHT) + expected: Buffer = Buffer(content=bytes(b'\x08\x68'), bit_length=13, padding=Padding.RIGHT) + buffer_subset: Buffer = buffer[0:13] + assert buffer_subset == expected + assert buffer_subset.padding == Padding.RIGHT + + + # retrieve last 4 bits (9 to 13) + # + # 0x08 0x68 0x00 + # |0 0 0 0 1 0 0 0|0 1 1 0 1 - - -|- - - - - - - -| (-) padding (11 bits padding on the right) + # + + + + (+) bits to output + # --> |1 1 0 1 - - - -| + # 0xd0 + buffer: Buffer = Buffer(content=bytes(b'\x01\x68'), bit_length=13, padding=Padding.RIGHT) + expected: Buffer = Buffer(content=bytes(b'\xd0'), bit_length=4, padding=Padding.RIGHT) + buffer_subset: Buffer = buffer[-4:] + assert buffer_subset == expected + assert buffer_subset.padding == Padding.RIGHT + +def test_add(): + left: Buffer = Buffer(content=b'\x40', bit_length=2, padding=Padding.RIGHT) + right: Buffer = Buffer(content=b'\x80', bit_length=2, padding=Padding.RIGHT) + left_right = left + right + expected: Buffer = Buffer(content=b'\x60', bit_length=4, padding=Padding.RIGHT) + assert left_right == expected + + left: Buffer = Buffer(content=b'\x40', bit_length=2, padding=Padding.RIGHT) + right: Buffer = Buffer(content=b'\x80', bit_length=8, padding=Padding.RIGHT) + left_right = left + right + expected: Buffer = Buffer(content=b'\x60\x00', bit_length=10, padding=Padding.RIGHT) + assert left_right == expected + + left: Buffer = Buffer(content=b'\x60', bit_length=4, padding=Padding.RIGHT) + right: Buffer = Buffer(content=b'\x80', bit_length=4, padding=Padding.RIGHT) + left_right = left + right + expected: Buffer = Buffer(content=b'\x68', bit_length=8, padding=Padding.RIGHT) + assert left_right == expected + + diff --git a/tests/compressor/test_compressor.py b/tests/compressor/test_compressor.py index cbb7138..1e2e9b5 100644 --- a/tests/compressor/test_compressor.py +++ b/tests/compressor/test_compressor.py @@ -1,11 +1,12 @@ from typing import List -from microschc.compressor.compressor import _compact_left, _encode_length, compress +from microschc.binary.buffer import Buffer, Padding +from microschc.compressor.compressor import _encode_length, compress from microschc.parser.factory import factory from microschc.parser.parser import PacketParser from microschc.parser.protocol.coap import CoAPFields from microschc.parser.protocol.ipv6 import IPv6Fields from microschc.parser.protocol.udp import UDPFields -from microschc.rfc8724 import DirectionIndicator, MatchMapping, PacketDescriptor, Pattern, RuleDescriptor, RuleFieldDescriptor +from microschc.rfc8724 import DirectionIndicator, MatchMapping, PacketDescriptor, RuleDescriptor, RuleFieldDescriptor from microschc.rfc8724extras import ParserDefinitions, StacksImplementation from microschc.rfc8724 import MatchingOperator as MO from microschc.rfc8724 import CompressionDecompressionAction as CDA @@ -31,82 +32,78 @@ def test_compress(): field_descriptors_1: List[RuleFieldDescriptor] = [ RuleFieldDescriptor( id=IPv6Fields.VERSION, length=4, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'\x06', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\x06', bit_length=4), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor( id=IPv6Fields.TRAFFIC_CLASS, length=8, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'\x00', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\x00', bit_length=8), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor( id=IPv6Fields.FLOW_LABEL, length=20, position=0, direction=DirectionIndicator.UP, - target_value=b'\x00\xef\x2d', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\x00\xef\x2d', bit_length=20), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor( id=IPv6Fields.PAYLOAD_LENGTH, length=16, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'', bit_length=16), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor( id=IPv6Fields.NEXT_HEADER, length=8, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'\x11', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\x11', bit_length=8), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor( id=IPv6Fields.HOP_LIMIT, length=8, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'\x40', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\x40', bit_length=8), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor( id=IPv6Fields.SRC_ADDRESS, length=128, position=0, direction=DirectionIndicator.UP, - target_value=Pattern(pattern=b'\x20\x01\x0d\xb8\x00\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00', length=120), + target_value=Buffer(content=b'\x20\x01\x0d\xb8\x00\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00', bit_length=120), matching_operator=MO.MSB, compression_decompression_action=CDA.LSB), RuleFieldDescriptor(id=IPv6Fields.DST_ADDRESS, length=128, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=MatchMapping(index_length=2, forward_mapping={b"\x20\x01\x0d\xb8\x00\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20":0}), + target_value=MatchMapping(forward_mapping={ + Buffer(content=b"\x20\x01\x0d\xb8\x00\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20", bit_length=128):Buffer(content=b'\x00', bit_length=2) + }), matching_operator=MO.MATCH_MAPPING, compression_decompression_action=CDA.MAPPING_SENT), RuleFieldDescriptor(id=UDPFields.SOURCE_PORT, length=16, position=0, direction=DirectionIndicator.UP, - target_value=b'\xd1\x00', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\xd1\x00', bit_length=16), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor(id=UDPFields.DESTINATION_PORT, length=16, position=0, direction=DirectionIndicator.UP, - target_value=b'\x16\x33', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\x16\x33', bit_length=16), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor(id=UDPFields.LENGTH, length=16, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor(id=UDPFields.CHECKSUM, length=16, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor(id=CoAPFields.VERSION, length=2, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'\x01', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\x01', bit_length=2), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor(id=CoAPFields.TYPE, length=2, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'\x02', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\x02', bit_length=2), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor(id=CoAPFields.TOKEN_LENGTH, length=4, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor(id=CoAPFields.CODE, length=8, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor(id=CoAPFields.MESSAGE_ID, length=16, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor(id=CoAPFields.TOKEN, length=0, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor(id=CoAPFields.OPTION_DELTA, length=4, position=0, direction=DirectionIndicator.UP, - target_value=b'\x0c', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\x0c', bit_length=0), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor(id=CoAPFields.OPTION_LENGTH, length=4, position=0, direction=DirectionIndicator.UP, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor(id=CoAPFields.OPTION_VALUE, length=0, position=0, direction=DirectionIndicator.UP, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor(id=CoAPFields.PAYLOAD_MARKER, length=8, position=0, direction=DirectionIndicator.UP, - target_value=b'\xff', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\xff', bit_length=8), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor(id=ParserDefinitions.PAYLOAD, length=0, position=0, direction=DirectionIndicator.UP, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT) + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT) ] - rule_descriptor_1: RuleDescriptor = RuleDescriptor(id=b'\x03', id_length=2, field_descriptors=field_descriptors_1) - - schc_packet, schc_packet_length = compress(packet_descriptor=packet_descriptor, rule_descriptor=rule_descriptor_1) - schc_packet = _compact_left( - buffer=schc_packet, buffer_length=schc_packet_length, - bytefield=packet_descriptor.payload, bytefield_length=8*len(packet_descriptor.payload) - ) - assert schc_packet == b'\xc0\x1a\x00\x80\x06\x85\xc2\x18'\ - b'\x45\x22\xf6\xf4\x0b\x83\x00\xef'\ - b'\xee\x66\x29\x12\x21\x86\xe5\xb7'\ - b'\xb2\x26\x26\xe2\x23\xa2\x22\xf3'\ - b'\x62\xf2\x22\xc2\x26\xe2\x23\xa2'\ - b'\x23\x02\xf3\x02\x22\xc2\x27\x62'\ - b'\x23\xa3\x53\x42\xe3\x07\xd2\xc7'\ - b'\xb2\x26\xe2\x23\xa2\x23\x02\xf3'\ - b'\x12\x22\xc2\x27\x62\x23\xa3\x43'\ - b'\x82\xe3\x07\xd2\xc7\xb2\x26\xe2'\ - b'\x23\xa2\x23\x02\xf3\x52\x22\xc2'\ - b'\x27\x62\x23\xa3\x13\x63\x63\x63'\ - b'\x23\x63\x33\x33\x33\x97\xd5\xd0' + rule_descriptor_1: RuleDescriptor = RuleDescriptor(id=Buffer(content=b'\x03', bit_length=2), field_descriptors=field_descriptors_1) + + schc_packet = compress(packet_descriptor=packet_descriptor, rule_descriptor=rule_descriptor_1) + schc_packet += packet_descriptor.payload + assert schc_packet == Buffer(content= b'\xc0\x1a\x00\x80\x06\x85\xc2\x18\x45\x22\xf6\xf4' \ + b'\x0b\x83\x00\xef\xee\x66\x29\x12\x21\x86\xe5\xb7' \ + b'\xb2\x26\x26\xe2\x23\xa2\x22\xf3\x62\xf2\x22\xc2' \ + b'\x26\xe2\x23\xa2\x23\x02\xf3\x02\x22\xc2\x27\x62' \ + b'\x23\xa3\x53\x42\xe3\x07\xd2\xc7\xb2\x26\xe2\x23' \ + b'\xa2\x23\x02\xf3\x12\x22\xc2\x27\x62\x23\xa3\x43' \ + b'\x82\xe3\x07\xd2\xc7\xb2\x26\xe2\x23\xa2\x23\x02' \ + b'\xf3\x52\x22\xc2\x27\x62\x23\xa3\x13\x63\x63\x63' \ + b'\x23\x63\x33\x33\x33\x97\xd5\xd0', + bit_length=828, padding=Padding.RIGHT) def test_encode_length(): @@ -115,167 +112,53 @@ def test_encode_length(): length: int = 5 expected_encoded_length: bytes = b'\x05' expected_encoded_length_length: int = 4 - encoded_length, encoded_length_length = _encode_length(length=length) + encoded_length: Buffer = _encode_length(length=length) - assert encoded_length == expected_encoded_length - assert encoded_length_length == expected_encoded_length_length + assert encoded_length.content == expected_encoded_length + assert encoded_length.bit_length == expected_encoded_length_length # test #2: length = 14 length: int = 14 expected_encoded_length: bytes = b'\x0e' expected_encoded_length_length: int = 4 - encoded_length, encoded_length_length = _encode_length(length=length) + encoded_length: Buffer = _encode_length(length=length) - assert encoded_length == expected_encoded_length - assert encoded_length_length == expected_encoded_length_length + assert encoded_length.content == expected_encoded_length + assert encoded_length.bit_length == expected_encoded_length_length # test #3: length = 15 length: int = 15 expected_encoded_length: bytes = b'\x0f\x0f' expected_encoded_length_length: int = 12 - encoded_length, encoded_length_length = _encode_length(length=length) + encoded_length: Buffer = _encode_length(length=length) - assert encoded_length == expected_encoded_length - assert encoded_length_length == expected_encoded_length_length + assert encoded_length.content == expected_encoded_length + assert encoded_length.bit_length == expected_encoded_length_length # test #4: length = 254 length: int = 254 expected_encoded_length: bytes = b'\x0f\xfe' expected_encoded_length_length: int = 12 - encoded_length, encoded_length_length = _encode_length(length=length) + encoded_length: Buffer = _encode_length(length=length) - assert encoded_length == expected_encoded_length - assert encoded_length_length == expected_encoded_length_length + assert encoded_length.content == expected_encoded_length + assert encoded_length.bit_length == expected_encoded_length_length # test #5: length = 255 length: int = 255 expected_encoded_length: bytes = b'\x0f\xff\x00\xff' expected_encoded_length_length: int = 28 - encoded_length, encoded_length_length = _encode_length(length=length) + encoded_length: Buffer = _encode_length(length=length) + + assert encoded_length.content == expected_encoded_length + assert encoded_length.bit_length == expected_encoded_length_length # test #6: length = 65535 length: int = 65535 expected_encoded_length: bytes = b'\x0f\xff\xff\xff' expected_encoded_length_length: int = 28 - encoded_length, encoded_length_length = _encode_length(length=length) - - assert encoded_length == expected_encoded_length - assert encoded_length_length == expected_encoded_length_length - - - -def test_compact_left(): + encoded_length: Buffer = _encode_length(length=length) - # test #1: bytefield needs 1 bit left shifting before concatenation - # - # buffer bytefield - # |-----------------------------------------------| |-----------------------------------------------| - # offset trailing - # = 3 zeros = 5 offset = 4 - # |-------------------------------|-----|---------| + |-------| - # 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 0 1 1 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 - # | byte0 | byte1 | byte2 | | byte 0 | byte 1 | byte 2 | - # 0x33 0xff 0x60 0x0c 0x33 0xff - # = - # | old buffer bytefield offset - # |-------------------------------------|---------------------------------------|-| - # 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 0 1 1 1 1 0 0 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 0 - # | byte0 | byte1 | byte2 | byte3 | byte4 | - # 0x33 0xff 0x78 0x67 0xfe - buffer: bytes = b'\x33\xff\x60' - trailing_zeros: int = 5 - bytefield = b'\x0c\x33\xff' - expected_buffer: bytes = b'\x33\xff\x78\x67\xfe' - compacted = _compact_left(buffer=buffer, buffer_length=19, bytefield=bytefield, bytefield_length=20) - assert compacted == expected_buffer - - - # test #2: bytefield needs 2 bits right shifting before concatenation - # - # buffer bytefield - # |-----------------------------------------------| |-----------------------------------------------| - # offset trailing - # = 6 zeros = 2 offset = 4 - # |-------------------------------|-----------|---| + |-------| - # 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 0 1 1 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 - # | byte0 | byte1 | byte2 | | byte 0 | byte 1 | byte 2 | - # 0x33 0xff 0x60 0x0c 0x33 0xff - # = - # | old buffer bytefield offset - # |-------------------------------------------|---------------------------------------|-----------| - # 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 0 1 1 0 0 0 1 1 0 0 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 0 0 0 0 0 0 - # | byte0 | byte1 | byte2 | byte3 | byte4 | byte5 | - # 0x33 0xff 0x63 0x0c 0xff 0xc0 - buffer: bytes = b'\x33\xff\x60' - trailing_zeros: int = 2 - bytefield = b'\x0c\x33\xff' - expected_buffer: bytes = b'\x33\xff\x63\x0c\xff\xc0' - compacted = _compact_left(buffer=buffer, buffer_length=22, bytefield=bytefield, bytefield_length=20) - assert compacted == expected_buffer - - # test #3: bytefield and buffer have the same non-zero offset - # - # buffer bytefield - # |-----------------------------------------------| |-----------------------------------------------| - # offset trailing - # = 4 zeros = 4 offset = 4 - # |-------------------------------|-------|-------| + |-------| - # 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 0 1 1 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 - # | byte0 | byte1 | byte2 | | byte 0 | byte 1 | byte 2 | - # 0x33 0xff 0x60 0x0c 0x33 0xff - # = - # | old buffer bytefield - # |---------------------------------------|---------------------------------------| - # 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 0 1 1 0 1 1 0 0 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 - # | byte0 | byte1 | byte2 | byte3 | byte4 | - # 0x33 0xff 0x6c 0x33 0xff - buffer: bytes = b'\x33\xff\x60' - trailing_zeros: int = 4 - bytefield = b'\x0c\x33\xff' - expected_buffer: bytes = b'\x33\xff\x6c\x33\xff' - compacted = _compact_left(buffer=buffer, buffer_length=20, bytefield=bytefield, bytefield_length=20) - assert compacted == expected_buffer - - # test #4: bytefield and buffer have the same zero offset - # - # buffer bytefield - # |-----------------------------------------------| |-----------------------------------------------| - # 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 0 1 1 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 - # | byte0 | byte1 | byte2 | | byte 0 | byte 1 | byte 2 | - # 0x33 0xff 0x60 0x0c 0x33 0xff - # = - # buffer bytefield - # |-----------------------------------------------|-----------------------------------------------| - # 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 0 1 1 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 - # | byte0 | byte1 | byte2 | byte 3 | byte 4 | byte 5 | - # 0x33 0xff 0x60 0x0c 0x33 0xff - - buffer: bytes = b'\x33\xff\x60' - trailing_zeros: int = 0 - bytefield = b'\x0c\x33\xff' - expected_buffer: bytes = b'\x33\xff\x60\x0c\x33\xff' - compacted = _compact_left(buffer=buffer, buffer_length=24, bytefield=bytefield, bytefield_length=24) - assert compacted == expected_buffer - - # test #5: bytefield needs 2 bits left shifting and buffer has zero offset - # - # buffer bytefield - # |-----------------------------------------------| |-----------------------------------------------| - # offset = 2 - # |---| - # 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 0 1 1 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 - # | byte0 | byte1 | byte2 | | byte 0 | byte 1 | byte 2 | - # 0x33 0xff 0x60 0x0c 0x33 0xff - # = - # buffer bytefield - # |-----------------------------------------------|-----------------------------------------------| - # 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 0 1 1 0 0 0 0 0 0 0 1 1 0 0 0 0 1 1 0 0 1 1 1 1 1 1 1 1 1 1 0 0 - # | byte0 | byte1 | byte2 | byte 3 | byte 4 | byte 5 | - # 0x33 0xff 0x60 0x30 0xcf 0xfc - - buffer: bytes = b'\x33\xff\x60' - bytefield = b'\x0c\x33\xff' - expected_buffer: bytes = b'\x33\xff\x60\x30\xcf\xfc' - compacted = _compact_left(buffer=buffer, buffer_length=24, bytefield=bytefield, bytefield_length=22) - assert compacted == expected_buffer \ No newline at end of file + assert encoded_length.content == expected_encoded_length + assert encoded_length.bit_length == expected_encoded_length_length diff --git a/tests/decompressor/__init__.py b/tests/decompressor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/decompressor/test_decompressor.py b/tests/decompressor/test_decompressor.py new file mode 100644 index 0000000..0d82c77 --- /dev/null +++ b/tests/decompressor/test_decompressor.py @@ -0,0 +1,102 @@ +from typing import List +from microschc.binary.buffer import Buffer, Padding +from microschc.parser.protocol.coap import CoAPFields +from microschc.parser.protocol.ipv6 import IPv6Fields +from microschc.parser.protocol.udp import UDPFields +from microschc.rfc8724 import MatchMapping, MatchingOperator as MO, RuleDescriptor +from microschc.rfc8724 import CompressionDecompressionAction as CDA +from microschc.rfc8724 import DirectionIndicator, RuleFieldDescriptor +from microschc.rfc8724extras import ParserDefinitions + +from microschc.decompressor.decompressor import decompress + + + +def test_decompress(): + valid_stack_packet:bytes = bytes( + b"\x60\x00\xef\x2d\x00\x68\x11\x40\x20\x01\x0d\xb8\x00\x0a\x00\x00" \ + b"\x00\x00\x00\x00\x00\x00\x00\x02\x20\x01\x0d\xb8\x00\x0a\x00\x00" \ + b"\x00\x00\x00\x00\x00\x00\x00\x20\xd1\x00\x16\x33\x00\x68\x5c\x21" \ + b"\x68\x45\x22\xf6\xb8\x30\x0e\xfe\xe6\x62\x91\x22\xc1\x6e\xff\x5b" \ + b"\x7b\x22\x62\x6e\x22\x3a\x22\x2f\x36\x2f\x22\x2c\x22\x6e\x22\x3a" \ + b"\x22\x30\x2f\x30\x22\x2c\x22\x76\x22\x3a\x35\x34\x2e\x30\x7d\x2c" \ + b"\x7b\x22\x6e\x22\x3a\x22\x30\x2f\x31\x22\x2c\x22\x76\x22\x3a\x34" \ + b"\x38\x2e\x30\x7d\x2c\x7b\x22\x6e\x22\x3a\x22\x30\x2f\x35\x22\x2c" \ + b"\x22\x76\x22\x3a\x31\x36\x36\x36\x32\x36\x33\x33\x33\x39\x7d\x5d" + ) + valid_buffer: Buffer = Buffer(content=valid_stack_packet, bit_length=8*len(valid_stack_packet), padding=Padding.RIGHT) + + field_descriptors_1: List[RuleFieldDescriptor] = [ + RuleFieldDescriptor( + id=IPv6Fields.VERSION, length=4, position=0, direction=DirectionIndicator.BIDIRECTIONAL, + target_value=Buffer(content=b'\x06', bit_length=4), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + RuleFieldDescriptor( + id=IPv6Fields.TRAFFIC_CLASS, length=8, position=0, direction=DirectionIndicator.BIDIRECTIONAL, + target_value=Buffer(content=b'\x00', bit_length=8), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + RuleFieldDescriptor( + id=IPv6Fields.FLOW_LABEL, length=20, position=0, direction=DirectionIndicator.UP, + target_value=Buffer(content=b'\x00\xef\x2d', bit_length=20), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + RuleFieldDescriptor( + id=IPv6Fields.PAYLOAD_LENGTH, length=16, position=0, direction=DirectionIndicator.BIDIRECTIONAL, + target_value=Buffer(content=b'', bit_length=16), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + RuleFieldDescriptor( + id=IPv6Fields.NEXT_HEADER, length=8, position=0, direction=DirectionIndicator.BIDIRECTIONAL, + target_value=Buffer(content=b'\x11', bit_length=8), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + RuleFieldDescriptor( + id=IPv6Fields.HOP_LIMIT, length=8, position=0, direction=DirectionIndicator.BIDIRECTIONAL, + target_value=Buffer(content=b'\x40', bit_length=8), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + RuleFieldDescriptor( + id=IPv6Fields.SRC_ADDRESS, length=128, position=0, direction=DirectionIndicator.UP, + target_value=Buffer(content=b'\x20\x01\x0d\xb8\x00\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00', bit_length=120), + matching_operator=MO.MSB, compression_decompression_action=CDA.LSB), + RuleFieldDescriptor(id=IPv6Fields.DST_ADDRESS, length=128, position=0, direction=DirectionIndicator.BIDIRECTIONAL, + target_value=MatchMapping(forward_mapping={ + Buffer(content=b"\x20\x01\x0d\xb8\x00\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20", bit_length=128):Buffer(content=b'\x00', bit_length=2) + }), + matching_operator=MO.MATCH_MAPPING, compression_decompression_action=CDA.MAPPING_SENT), + + RuleFieldDescriptor(id=UDPFields.SOURCE_PORT, length=16, position=0, direction=DirectionIndicator.UP, + target_value=Buffer(content=b'\xd1\x00', bit_length=16), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + RuleFieldDescriptor(id=UDPFields.DESTINATION_PORT, length=16, position=0, direction=DirectionIndicator.UP, + target_value=Buffer(content=b'\x16\x33', bit_length=16), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + RuleFieldDescriptor(id=UDPFields.LENGTH, length=16, position=0, direction=DirectionIndicator.BIDIRECTIONAL, + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + RuleFieldDescriptor(id=UDPFields.CHECKSUM, length=16, position=0, direction=DirectionIndicator.BIDIRECTIONAL, + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + + RuleFieldDescriptor(id=CoAPFields.VERSION, length=2, position=0, direction=DirectionIndicator.BIDIRECTIONAL, + target_value=Buffer(content=b'\x01', bit_length=2), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + RuleFieldDescriptor(id=CoAPFields.TYPE, length=2, position=0, direction=DirectionIndicator.BIDIRECTIONAL, + target_value=Buffer(content=b'\x02', bit_length=2), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + RuleFieldDescriptor(id=CoAPFields.TOKEN_LENGTH, length=4, position=0, direction=DirectionIndicator.BIDIRECTIONAL, + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + RuleFieldDescriptor(id=CoAPFields.CODE, length=8, position=0, direction=DirectionIndicator.BIDIRECTIONAL, + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + RuleFieldDescriptor(id=CoAPFields.MESSAGE_ID, length=16, position=0, direction=DirectionIndicator.BIDIRECTIONAL, + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + RuleFieldDescriptor(id=CoAPFields.TOKEN, length=0, position=0, direction=DirectionIndicator.BIDIRECTIONAL, + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + RuleFieldDescriptor(id=CoAPFields.OPTION_DELTA, length=4, position=0, direction=DirectionIndicator.UP, + target_value=Buffer(content=b'\x0c', bit_length=4), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + RuleFieldDescriptor(id=CoAPFields.OPTION_LENGTH, length=4, position=0, direction=DirectionIndicator.UP, + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + RuleFieldDescriptor(id=CoAPFields.OPTION_VALUE, length=0, position=0, direction=DirectionIndicator.UP, + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + RuleFieldDescriptor(id=CoAPFields.PAYLOAD_MARKER, length=8, position=0, direction=DirectionIndicator.UP, + target_value=Buffer(content=b'\xff', bit_length=8), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + ] + rule_descriptor_1: RuleDescriptor = RuleDescriptor(id=Buffer(content=b'\x03', bit_length=2), field_descriptors=field_descriptors_1) + + schc_packet: Buffer = Buffer(content= b'\xc0\x1a\x00\x80\x06\x85\xc2\x18\x45\x22\xf6\xf4' \ + b'\x0b\x83\x00\xef\xee\x66\x29\x12\x21\x86\xe5\xb7' \ + b'\xb2\x26\x26\xe2\x23\xa2\x22\xf3\x62\xf2\x22\xc2' \ + b'\x26\xe2\x23\xa2\x23\x02\xf3\x02\x22\xc2\x27\x62' \ + b'\x23\xa3\x53\x42\xe3\x07\xd2\xc7\xb2\x26\xe2\x23' \ + b'\xa2\x23\x02\xf3\x12\x22\xc2\x27\x62\x23\xa3\x43' \ + b'\x82\xe3\x07\xd2\xc7\xb2\x26\xe2\x23\xa2\x23\x02' \ + b'\xf3\x52\x22\xc2\x27\x62\x23\xa3\x13\x63\x63\x63' \ + b'\x23\x63\x33\x33\x33\x97\xd5\xd0', + bit_length=828, padding=Padding.RIGHT) + + decompressed_packet = decompress(schc_packet=schc_packet, direction=DirectionIndicator.UP, rule=rule_descriptor_1) + assert decompressed_packet.content == valid_stack_packet \ No newline at end of file diff --git a/tests/matching/test_operators.py b/tests/matching/test_operators.py index 36d4498..b89a09b 100644 --- a/tests/matching/test_operators.py +++ b/tests/matching/test_operators.py @@ -1,5 +1,6 @@ from microschc.matching.operators import equal, ignore, most_significant_bits, match_mapping from microschc.rfc8724 import FieldDescriptor, MatchMapping +from microschc.binary.buffer import Buffer SOME_ID = 'ID' @@ -10,11 +11,11 @@ def test_equal(): """ # test on bytes values - bytes_target_value = b'\x13\xff' - other_bytes_value_of_same_length = b'\x14\xff' - same_bytes_value_of_different_length = b'\x00\x13\xff' - different_bytes_value_of_different_length = b'\x00\x14\xff' - bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, length=16, position=0, value=bytes_target_value) + bytes_target_value = Buffer(content=b'\x13\xff', bit_length=16) + other_bytes_value_of_same_length = Buffer(content=b'\x14\xff', bit_length=16) + same_bytes_value_of_different_length = Buffer(content=b'\x00\x13\xff', bit_length=24) + different_bytes_value_of_different_length = Buffer(content=b'\x00\x14\xff', bit_length=24) + bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, position=0, value=bytes_target_value) assert equal(bytes_field, target_value=bytes_target_value) == True assert equal(bytes_field, target_value=other_bytes_value_of_same_length) == False assert equal(bytes_field, target_value=same_bytes_value_of_different_length) == False @@ -24,64 +25,54 @@ def test_ignore(): """test: ignore matching operator Test that matching operator always returns True """ - any_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, length=16, position=0, value=b'') + any_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, position=0, value=Buffer(content=b'', bit_length=16)) assert ignore(any_field) == True def test_most_significant_bits(): """test: MSB(x) matching operator """ - pattern: bytes = b'\x01\x9f\xf9' - pattern_length = 17 - # test on: # - value matching the pattern # - value is not byte-aligned # - residue is not byte-aligned # - pattern is not byte-aligned - pattern: bytes = b'\x01\x9f\xf9' - pattern_length = 17 - field_value: bytes = b'\x33\xff\x23\xdb\xda' - field_length: int = 38 + pattern: Buffer = Buffer(content=b'\x01\x9f\xf9', bit_length=17) + field_value: Buffer = Buffer(content=b'\x33\xff\x23\xdb\xda', bit_length=38) - bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, length=field_length, position=0, value=field_value) - assert most_significant_bits(bytes_field, pattern_length=pattern_length, pattern=pattern) == True + bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, position=0, value=field_value) + assert most_significant_bits(bytes_field, pattern=pattern) == True # test on: # - value matching the pattern # - value is byte-aligned # - pattern is not byte-aligned - pattern: bytes = b'\x33\xff\x23' - pattern_length = 24 - field_value: bytes = b'\x33\xff\x23\xdb\xda' - field_length: int = 40 + pattern: Buffer = Buffer(content=b'\x33\xff\x23', bit_length=24) + field_value: Buffer = Buffer(content=b'\x33\xff\x23\xdb\xda', bit_length=40) - bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, length=field_length, position=0, value=field_value) - assert most_significant_bits(bytes_field, pattern_length=pattern_length, pattern=pattern) == True + bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, position=0, value=field_value) + assert most_significant_bits(bytes_field, pattern=pattern) == True # test on: # - value not matching the pattern # - value is not byte-aligned # - pattern is not byte-aligned - pattern: bytes = b'\x01\x9f\xf9' - pattern_length = 17 - field_value: bytes = b'\x34\xff\x23\xdb\xda' - field_length: int = 38 + pattern: Buffer = Buffer(content=b'\x01\x9f\xf9', bit_length=17) + field_value: Buffer = Buffer(content=b'\x34\xff\x23\xdb\xda', bit_length=38) + - bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, length=field_length, position=0, value=field_value) - assert most_significant_bits(bytes_field, pattern_length=pattern_length, pattern=pattern) == False + bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, position=0, value=field_value) + assert most_significant_bits(bytes_field, pattern=pattern) == False # test on: # - value not matching the pattern # - value is not byte-aligned # - pattern is not byte-aligned - pattern: bytes = b'\x01\x9f\xf9' - pattern_length = 17 - field_value: bytes = b'\xf4\xff\x23\xdb\xda' - field_length: int = 38 + pattern: Buffer = Buffer(content=b'\x01\x9f\xf9', bit_length=17) + field_value: Buffer = Buffer(content=b'\xf4\xff\x23\xdb\xda', bit_length=38) - bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, length=field_length, position=0, value=field_value) - assert most_significant_bits(bytes_field, pattern_length=pattern_length, pattern=pattern) == False + bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, position=0, value=field_value) + assert most_significant_bits(bytes_field, pattern=pattern) == False def test_match_mapping(): """test: match-mapping operator @@ -89,12 +80,17 @@ def test_match_mapping(): """ - bytes_mapping: MatchMapping = MatchMapping(index_length=2, forward_mapping={b'\xff\x13':1 , b'\xff\xff\x00':2, b'\x00':3, b'\x0e':4}) + bytes_mapping: MatchMapping = MatchMapping(forward_mapping={ + Buffer(content=b'\xff\x13', bit_length=16): Buffer(content=b'\x01', bit_length=2), + Buffer(content=b'\xff\xff\x00', bit_length=24): Buffer(content=b'\x02', bit_length=2), + Buffer(content=b'\x00', bit_length=8): Buffer(content=b'\x03', bit_length=2), + Buffer(content=b'\x0e', bit_length=8): Buffer(content=b'\x04', bit_length=2) + }) # testing on bytes fields - matching_bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, length=16, position=0, value=b'\xff\x13') + matching_bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, position=0, value=Buffer(content=b'\xff\x13', bit_length=16)) assert match_mapping(matching_bytes_field, target_values=bytes_mapping) == True - non_matching_bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, length=16, position=0, value=b'\xff\x15') + non_matching_bytes_field: FieldDescriptor = FieldDescriptor(id=SOME_ID, position=0, value=Buffer(content=b'\xff\x15', bit_length=16)) assert match_mapping(non_matching_bytes_field, target_values=bytes_mapping) == False diff --git a/tests/parser/protocol/test_parser_coap.py b/tests/parser/protocol/test_parser_coap.py index ce0a1b4..6b17128 100644 --- a/tests/parser/protocol/test_parser_coap.py +++ b/tests/parser/protocol/test_parser_coap.py @@ -1,5 +1,6 @@ from microschc.parser.protocol.coap import CoAPFields, CoAPParser from microschc.rfc8724 import FieldDescriptor, HeaderDescriptor +from microschc.binary.buffer import Buffer def test_coap_parser_import(): """test: IPv6 header parser import and instanciation @@ -79,39 +80,33 @@ def test_coap_parser_parse(): # assert field descriptors match CoAP header content version_fd:FieldDescriptor = coap_header_descriptor.fields[0] assert version_fd.id == CoAPFields.VERSION - assert version_fd.length == 2 assert version_fd.position == 0 - assert version_fd.value == b'\x01' + assert version_fd.value == Buffer(content=b'\x01', bit_length=2) type_fd:FieldDescriptor = coap_header_descriptor.fields[1] assert type_fd.id == CoAPFields.TYPE - assert type_fd.length == 2 assert type_fd.position == 0 - assert type_fd.value == b'\x00' + assert type_fd.value == Buffer(content=b'\x00', bit_length=2) token_length_fd:FieldDescriptor = coap_header_descriptor.fields[2] assert token_length_fd.id == CoAPFields.TOKEN_LENGTH - assert token_length_fd.length == 4 assert token_length_fd.position == 0 - assert token_length_fd.value == b'\x08' + assert token_length_fd.value == Buffer(content=b'\x08', bit_length=4) code_fd:FieldDescriptor = coap_header_descriptor.fields[3] assert code_fd.id == CoAPFields.CODE - assert code_fd.length == 8 assert code_fd.position == 0 - assert code_fd.value == b'\x02' + assert code_fd.value == Buffer(content=b'\x02', bit_length=8) message_id_fd:FieldDescriptor = coap_header_descriptor.fields[4] assert message_id_fd.id == CoAPFields.MESSAGE_ID - assert message_id_fd.length == 16 assert message_id_fd.position == 0 - assert message_id_fd.value == b'\x84\x99' + assert message_id_fd.value == Buffer(content=b'\x84\x99', bit_length=16) token_fd:FieldDescriptor = coap_header_descriptor.fields[5] assert token_fd.id == CoAPFields.TOKEN - assert token_fd.length == 64 assert token_fd.position == 0 - assert token_fd.value == b'\x74\xcd\xe8\xcb\x4e\x8c\x0d\xb7' + assert token_fd.value == Buffer(content=b'\x74\xcd\xe8\xcb\x4e\x8c\x0d\xb7', bit_length=64) # TODO: assert the list of options field descriptors match the CoAP options diff --git a/tests/parser/protocol/test_parser_ipv6.py b/tests/parser/protocol/test_parser_ipv6.py index f8956a4..7eb0b13 100644 --- a/tests/parser/protocol/test_parser_ipv6.py +++ b/tests/parser/protocol/test_parser_ipv6.py @@ -1,6 +1,7 @@ from microschc.parser.protocol.ipv6 import IPv6Parser, IPv6Fields from microschc.parser.parser import HeaderDescriptor from microschc.rfc8724 import FieldDescriptor +from microschc.binary.buffer import Buffer def test_ipv6_parser_import(): """test: IPv6 header parser import and instanciation @@ -46,51 +47,43 @@ def test_ipv6_parser_parse(): # assert field descriptors match IPv6 header content version_fd:FieldDescriptor = ipv6_header_descriptor.fields[0] assert version_fd.id == IPv6Fields.VERSION - assert version_fd.length == 4 assert version_fd.position == 0 - assert version_fd.value == b'\x06' + assert version_fd.value == Buffer(content=b'\x06', bit_length=4) traffic_class_fd:FieldDescriptor = ipv6_header_descriptor.fields[1] assert traffic_class_fd.id == IPv6Fields.TRAFFIC_CLASS - assert traffic_class_fd.length == 8 assert traffic_class_fd.position == 0 - assert traffic_class_fd.value == b'\x00' + assert traffic_class_fd.value == Buffer(content=b'\x00', bit_length=8) flow_label_fd:FieldDescriptor = ipv6_header_descriptor.fields[2] assert flow_label_fd.id == IPv6Fields.FLOW_LABEL - assert flow_label_fd.length == 20 assert flow_label_fd.position == 0 - assert flow_label_fd.value == b'\x00\x00\x00' + assert flow_label_fd.value == Buffer(content=b'\x00\x00\x00', bit_length=20) payload_length_fd:FieldDescriptor = ipv6_header_descriptor.fields[3] assert payload_length_fd.id == IPv6Fields.PAYLOAD_LENGTH - assert payload_length_fd.length == 16 assert payload_length_fd.position == 0 - assert payload_length_fd.value == b'\x00\x10' + assert payload_length_fd.value == Buffer(content=b'\x00\x10', bit_length=16) next_header_fd:FieldDescriptor = ipv6_header_descriptor.fields[4] assert next_header_fd.id == IPv6Fields.NEXT_HEADER - assert next_header_fd.length == 8 assert next_header_fd.position == 0 - assert next_header_fd.value == b'\x11' + assert next_header_fd.value == Buffer(content=b'\x11', bit_length=8) hop_limit_fd:FieldDescriptor = ipv6_header_descriptor.fields[5] assert hop_limit_fd.id == IPv6Fields.HOP_LIMIT - assert hop_limit_fd.length == 8 assert hop_limit_fd.position == 0 - assert hop_limit_fd.value == b'\x40' + assert hop_limit_fd.value == Buffer(content=b'\x40', bit_length=8) source_address_fd:FieldDescriptor = ipv6_header_descriptor.fields[6] assert source_address_fd.id == IPv6Fields.SRC_ADDRESS - assert source_address_fd.length == 128 assert source_address_fd.position == 0 - assert source_address_fd.value == b'\xfe\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa1' + assert source_address_fd.value == Buffer(content=b'\xfe\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa1', bit_length=128) destination_address_fd:FieldDescriptor = ipv6_header_descriptor.fields[7] assert destination_address_fd.id == IPv6Fields.DST_ADDRESS - assert destination_address_fd.length == 128 assert destination_address_fd.position == 0 - assert destination_address_fd.value == b'\xfe\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa2' + assert destination_address_fd.value == Buffer(content=b'\xfe\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa2', bit_length=128) diff --git a/tests/parser/protocol/test_parser_udp.py b/tests/parser/protocol/test_parser_udp.py index a927369..345cdc3 100644 --- a/tests/parser/protocol/test_parser_udp.py +++ b/tests/parser/protocol/test_parser_udp.py @@ -1,6 +1,7 @@ from microschc.parser.protocol.udp import UDPParser, UDPFields from microschc.parser.parser import HeaderDescriptor from microschc.rfc8724 import FieldDescriptor +from microschc.binary.buffer import Buffer def test_ipv6_parser_import(): """test: IPv6 header parser import and instanciation @@ -40,24 +41,20 @@ def test_udp_parser_parse(): # assert field descriptors match UDP header content source_port_fd:FieldDescriptor = udp_header_descriptor.fields[0] assert source_port_fd.id == UDPFields.SOURCE_PORT - assert source_port_fd.length == 16 assert source_port_fd.position == 0 - assert source_port_fd.value == b'\x23\x29' + assert source_port_fd.value == Buffer(content=b'\x23\x29', bit_length=16) destination_port_fd:FieldDescriptor = udp_header_descriptor.fields[1] assert destination_port_fd.id == UDPFields.DESTINATION_PORT - assert destination_port_fd.length == 16 assert destination_port_fd.position == 0 - assert destination_port_fd.value == b'\x23\x2a' + assert destination_port_fd.value == Buffer(content=b'\x23\x2a', bit_length=16) length_fd:FieldDescriptor = udp_header_descriptor.fields[2] assert length_fd.id == UDPFields.LENGTH - assert length_fd.length == 16 assert length_fd.position == 0 - assert length_fd.value == b'\x00\x10' + assert length_fd.value == Buffer(content=b'\x00\x10', bit_length=16) checksum_fd:FieldDescriptor = udp_header_descriptor.fields[3] assert checksum_fd.id == UDPFields.CHECKSUM - assert checksum_fd.length == 16 assert checksum_fd.position == 0 - assert checksum_fd.value == b'\x2d\xa1' + assert checksum_fd.value == Buffer(content=b'\x2d\xa1', bit_length=16) diff --git a/tests/parser/test_parser.py b/tests/parser/test_parser.py index 535b25e..fc6cdac 100644 --- a/tests/parser/test_parser.py +++ b/tests/parser/test_parser.py @@ -4,7 +4,8 @@ from microschc.parser.protocol.ipv6 import IPv6Fields from microschc.parser.protocol.udp import UDPFields from microschc.rfc8724 import DirectionIndicator, HeaderDescriptor, PacketDescriptor -from microschc.rfc8724extras import ParserDefinitions, StacksImplementation +from microschc.rfc8724extras import StacksImplementation +from microschc.binary.buffer import Buffer def test_parser_ipv6_udp_coap(): """ @@ -68,54 +69,55 @@ def test_parser_ipv6_udp_coap(): coap_header: HeaderDescriptor = packet_descriptor.headers[2] assert ipv6_header.fields[0].id == IPv6Fields.VERSION - assert ipv6_header.fields[0].value == b'\x06' + assert ipv6_header.fields[0].value == Buffer(content=b'\x06', bit_length=4) assert ipv6_header.fields[1].id == IPv6Fields.TRAFFIC_CLASS - assert ipv6_header.fields[1].value == b'\x00' + assert ipv6_header.fields[1].value == Buffer(content=b'\x00', bit_length=8) assert ipv6_header.fields[2].id == IPv6Fields.FLOW_LABEL - assert ipv6_header.fields[2].value == b'\x00\xef\x2d' + assert ipv6_header.fields[2].value == Buffer(content=b'\x00\xef\x2d', bit_length=20) assert ipv6_header.fields[3].id == IPv6Fields.PAYLOAD_LENGTH - assert ipv6_header.fields[3].value == b'\x00\x68' + assert ipv6_header.fields[3].value == Buffer(content=b'\x00\x68', bit_length=16) assert ipv6_header.fields[4].id == IPv6Fields.NEXT_HEADER - assert ipv6_header.fields[4].value == b'\x11' + assert ipv6_header.fields[4].value == Buffer(content=b'\x11', bit_length=8) assert ipv6_header.fields[5].id == IPv6Fields.HOP_LIMIT - assert ipv6_header.fields[5].value == b'\x40' + assert ipv6_header.fields[5].value == Buffer(content=b'\x40', bit_length=8) assert ipv6_header.fields[6].id == IPv6Fields.SRC_ADDRESS - assert ipv6_header.fields[6].value == b"\x20\x01\x0d\xb8\x00\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02" + assert ipv6_header.fields[6].value == Buffer(content=b"\x20\x01\x0d\xb8\x00\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02", bit_length=128) assert ipv6_header.fields[7].id == IPv6Fields.DST_ADDRESS - assert ipv6_header.fields[7].value == b"\x20\x01\x0d\xb8\x00\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20" + assert ipv6_header.fields[7].value == Buffer(content=b"\x20\x01\x0d\xb8\x00\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20", bit_length=128) assert udp_header.fields[0].id == UDPFields.SOURCE_PORT - assert udp_header.fields[0].value == b'\xd1\x00' + assert udp_header.fields[0].value == Buffer(content=b'\xd1\x00', bit_length=16) assert udp_header.fields[1].id == UDPFields.DESTINATION_PORT - assert udp_header.fields[1].value == b'\x16\x33' + assert udp_header.fields[1].value == Buffer(content=b'\x16\x33', bit_length=16) assert udp_header.fields[2].id == UDPFields.LENGTH - assert udp_header.fields[2].value == b'\x00\x68' + assert udp_header.fields[2].value == Buffer(content=b'\x00\x68', bit_length=16) assert udp_header.fields[3].id == UDPFields.CHECKSUM - assert udp_header.fields[3].value == b'\x5c\x21' + assert udp_header.fields[3].value == Buffer(content=b'\x5c\x21', bit_length=16) assert coap_header.fields[0].id == CoAPFields.VERSION - assert coap_header.fields[0].value == b'\x01' + assert coap_header.fields[0].value == Buffer(content=b'\x01', bit_length=2) assert coap_header.fields[1].id == CoAPFields.TYPE - assert coap_header.fields[1].value == b'\x02' + assert coap_header.fields[1].value == Buffer(content=b'\x02', bit_length=2) assert coap_header.fields[2].id == CoAPFields.TOKEN_LENGTH - assert coap_header.fields[2].value == b'\x08' + assert coap_header.fields[2].value == Buffer(content=b'\x08', bit_length=4) assert coap_header.fields[3].id == CoAPFields.CODE - assert coap_header.fields[3].value == b'\x45' + assert coap_header.fields[3].value == Buffer(content=b'\x45', bit_length=8) assert coap_header.fields[4].id == CoAPFields.MESSAGE_ID - assert coap_header.fields[4].value == b'\x22\xf6' + assert coap_header.fields[4].value == Buffer(content=b'\x22\xf6', bit_length=16) assert coap_header.fields[5].id == CoAPFields.TOKEN - assert coap_header.fields[5].value == b"\xb8\x30\x0e\xfe\xe6\x62\x91\x22" + assert coap_header.fields[5].value == Buffer(content=b"\xb8\x30\x0e\xfe\xe6\x62\x91\x22", bit_length=64) assert coap_header.fields[6].id == CoAPFields.OPTION_DELTA - assert coap_header.fields[6].value == b'\x0c' + assert coap_header.fields[6].value == Buffer(content=b'\x0c', bit_length=4) assert coap_header.fields[7].id == CoAPFields.OPTION_LENGTH - assert coap_header.fields[7].value == b'\x01' + assert coap_header.fields[7].value == Buffer(content=b'\x01', bit_length=4) assert coap_header.fields[8].id == CoAPFields.OPTION_VALUE - assert coap_header.fields[8].value == b'\x6e' + assert coap_header.fields[8].value == Buffer(content=b'\x6e', bit_length=8) assert coap_header.fields[9].id == CoAPFields.PAYLOAD_MARKER - assert coap_header.fields[9].value == b'\xff' + assert coap_header.fields[9].value == Buffer(content=b'\xff', bit_length=8) - assert packet_descriptor.payload == b"\x5b\x7b\x22\x62\x6e\x22\x3a\x22\x2f\x36\x2f\x22\x2c\x22\x6e\x22" \ - b"\x3a\x22\x30\x2f\x30\x22\x2c\x22\x76\x22\x3a\x35\x34\x2e\x30\x7d" \ - b"\x2c\x7b\x22\x6e\x22\x3a\x22\x30\x2f\x31\x22\x2c\x22\x76\x22\x3a" \ - b"\x34\x38\x2e\x30\x7d\x2c\x7b\x22\x6e\x22\x3a\x22\x30\x2f\x35\x22" \ - b"\x2c\x22\x76\x22\x3a\x31\x36\x36\x36\x32\x36\x33\x33\x33\x39\x7d\x5d" \ No newline at end of file + payload_content: bytes = b"\x5b\x7b\x22\x62\x6e\x22\x3a\x22\x2f\x36\x2f\x22\x2c\x22\x6e\x22" \ + b"\x3a\x22\x30\x2f\x30\x22\x2c\x22\x76\x22\x3a\x35\x34\x2e\x30\x7d" \ + b"\x2c\x7b\x22\x6e\x22\x3a\x22\x30\x2f\x31\x22\x2c\x22\x76\x22\x3a" \ + b"\x34\x38\x2e\x30\x7d\x2c\x7b\x22\x6e\x22\x3a\x22\x30\x2f\x35\x22" \ + b"\x2c\x22\x76\x22\x3a\x31\x36\x36\x36\x32\x36\x33\x33\x33\x39\x7d\x5d" + assert packet_descriptor.payload == Buffer(content=payload_content, bit_length=8*len(payload_content)) \ No newline at end of file diff --git a/tests/ruler/test_ruler.py b/tests/ruler/test_ruler.py index 20b579c..3c598fa 100644 --- a/tests/ruler/test_ruler.py +++ b/tests/ruler/test_ruler.py @@ -1,10 +1,11 @@ from typing import List +from microschc.binary.buffer import Buffer, Padding from microschc.parser.factory import factory from microschc.parser.parser import PacketParser from microschc.parser.protocol.coap import CoAPFields from microschc.parser.protocol.ipv6 import IPv6Fields from microschc.parser.protocol.udp import UDPFields -from microschc.rfc8724 import CompressionDecompressionAction, DirectionIndicator, FieldDescriptor, MatchMapping, MatchingOperator, PacketDescriptor, Pattern, RuleDescriptor, RuleFieldDescriptor +from microschc.rfc8724 import CompressionDecompressionAction, DirectionIndicator, FieldDescriptor, MatchMapping, MatchingOperator, PacketDescriptor, RuleDescriptor, RuleFieldDescriptor from microschc.rfc8724 import CompressionDecompressionAction as CDA from microschc.rfc8724 import MatchingOperator as MO from microschc.rfc8724extras import ParserDefinitions, StacksImplementation @@ -13,14 +14,14 @@ def test_ruler_field_match(): # test the `ignore` matching-operator on a matching field with same ID and same length - packet_field: FieldDescriptor = FieldDescriptor(id='field-1', length=16, position=0, value=b'\xff\xff') + packet_field: FieldDescriptor = FieldDescriptor(id='field-1', position=0, value=Buffer(content=b'\xff\xff', bit_length=16)) rule_field_ignore: RuleFieldDescriptor = RuleFieldDescriptor( id='field-1', length=16, position=0, direction=DirectionIndicator.UP, - target_value=b'', + target_value=Buffer(content=b'', bit_length=16), matching_operator=MatchingOperator.IGNORE, compression_decompression_action=CDA.NOT_SENT ) @@ -32,7 +33,7 @@ def test_ruler_field_match(): length=16, position=0, direction=DirectionIndicator.UP, - target_value=b'\xff\xff', + target_value=Buffer(content=b'\xff\xff', bit_length=16), matching_operator=MatchingOperator.EQUAL, compression_decompression_action=CDA.NOT_SENT ) @@ -40,19 +41,19 @@ def test_ruler_field_match(): # test the `equal` matching-operator on a matching field with Same ID, same value representation but # different length - packet_field: FieldDescriptor = FieldDescriptor(id='field-1', length=15, position=0, value=b'\x7f\xff') + packet_field: FieldDescriptor = FieldDescriptor(id='field-1', position=0, value=Buffer(content=b'\x7f\xff', bit_length=15)) rule_field_equal: RuleFieldDescriptor = RuleFieldDescriptor( id='field-1', - length=16, + length=15, position=0, direction=DirectionIndicator.UP, - target_value=b'\7f\xff', + target_value=Buffer(content=b'\7f\xff', bit_length=16), matching_operator=MatchingOperator.EQUAL, compression_decompression_action=CDA.NOT_SENT ) assert _field_match(packet_field=packet_field, rule_field=rule_field_equal) == False -def test_rule_match(): +def test_match_packet_descriptor(): valid_stack_packet:bytes = bytes( b"\x60\x00\xef\x2d\x00\x68\x11\x40\x20\x01\x0d\xb8\x00\x0a\x00\x00" \ b"\x00\x00\x00\x00\x00\x00\x00\x02\x20\x01\x0d\xb8\x00\x0a\x00\x00" \ @@ -72,62 +73,89 @@ def test_rule_match(): field_descriptors_1: List[RuleFieldDescriptor] = [ RuleFieldDescriptor( id=IPv6Fields.VERSION, length=4, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'\x06', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\x06', bit_length=4), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor( id=IPv6Fields.TRAFFIC_CLASS, length=8, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'\x00', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\x00', bit_length=8), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor( id=IPv6Fields.FLOW_LABEL, length=20, position=0, direction=DirectionIndicator.UP, - target_value=b'\x00\xef\x2d', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\x00\xef\x2d', bit_length=20), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor( id=IPv6Fields.PAYLOAD_LENGTH, length=16, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'', bit_length=16), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor( id=IPv6Fields.NEXT_HEADER, length=8, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'\x11', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\x11', bit_length=8), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor( id=IPv6Fields.HOP_LIMIT, length=8, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'\x40', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(b'\x40', bit_length=8), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor( id=IPv6Fields.SRC_ADDRESS, length=128, position=0, direction=DirectionIndicator.UP, - target_value=Pattern(pattern=b'\x20\x01\x0d\xb8\x00\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00', length=120), + target_value=Buffer(content=b'\x20\x01\x0d\xb8\x00\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00', bit_length=120), matching_operator=MO.MSB, compression_decompression_action=CDA.LSB), - RuleFieldDescriptor(id=IPv6Fields.DST_ADDRESS, length=128, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=MatchMapping(index_length=2, forward_mapping={b"\x20\x01\x0d\xb8\x00\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20":0}), + RuleFieldDescriptor( + id=IPv6Fields.DST_ADDRESS, length=128, position=0, direction=DirectionIndicator.BIDIRECTIONAL, + target_value=MatchMapping(forward_mapping={ + Buffer(content=b"\x20\x01\x0d\xb8\x00\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20", bit_length=128):Buffer(content=b'\x00', bit_length=2) + }), matching_operator=MO.MATCH_MAPPING, compression_decompression_action=CDA.MAPPING_SENT), RuleFieldDescriptor(id=UDPFields.SOURCE_PORT, length=16, position=0, direction=DirectionIndicator.UP, - target_value=b'\xd1\x00', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\xd1\x00', bit_length=16), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor(id=UDPFields.DESTINATION_PORT, length=16, position=0, direction=DirectionIndicator.UP, - target_value=b'\x16\x33', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\x16\x33', bit_length=16), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor(id=UDPFields.LENGTH, length=16, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor(id=UDPFields.CHECKSUM, length=16, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'',bit_length=16), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor(id=CoAPFields.VERSION, length=2, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'\x01', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\x01', bit_length=2), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor(id=CoAPFields.TYPE, length=2, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'\x02', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\x02', bit_length=2), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor(id=CoAPFields.TOKEN_LENGTH, length=4, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'', bit_length=4), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor(id=CoAPFields.CODE, length=8, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'', bit_length=8), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor(id=CoAPFields.MESSAGE_ID, length=16, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'', bit_length=16), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor(id=CoAPFields.TOKEN, length=64, position=0, direction=DirectionIndicator.BIDIRECTIONAL, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'', bit_length=64), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor(id=CoAPFields.OPTION_DELTA, length=4, position=0, direction=DirectionIndicator.UP, - target_value=b'\x0c', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), + target_value=Buffer(content=b'\x0c', bit_length=4), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT), RuleFieldDescriptor(id=CoAPFields.OPTION_LENGTH, length=4, position=0, direction=DirectionIndicator.UP, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'', bit_length=4), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor(id=CoAPFields.OPTION_VALUE, length=0, position=0, direction=DirectionIndicator.UP, - target_value=b'', matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), + target_value=Buffer(content=b'', bit_length=0), matching_operator=MO.IGNORE, compression_decompression_action=CDA.VALUE_SENT), RuleFieldDescriptor(id=CoAPFields.PAYLOAD_MARKER, length=8, position=0, direction=DirectionIndicator.UP, - target_value=b'\xff', matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT) + target_value=Buffer(content=b'\xff', bit_length=8), matching_operator=MO.EQUAL, compression_decompression_action=CDA.NOT_SENT) ] - rule_descriptor_1: RuleDescriptor = RuleDescriptor(id=b'\x00', id_length=2, field_descriptors=field_descriptors_1) - default_rule_descriptor: RuleDescriptor = RuleDescriptor(id=b'\x01', id_length=2, field_descriptors=[]) + rule_descriptor_1: RuleDescriptor = RuleDescriptor(id=Buffer(content=b'\x00', bit_length=2), field_descriptors=field_descriptors_1) + default_rule_descriptor: RuleDescriptor = RuleDescriptor(id=Buffer(content=b'\x01', bit_length=2), field_descriptors=[]) ruler: Ruler = Ruler(rules_descriptors=[rule_descriptor_1, default_rule_descriptor]) - assert ruler.match(packet_descriptor=packet_descriptor).id == rule_descriptor_1.id \ No newline at end of file + assert ruler.match_packet_descriptor(packet_descriptor=packet_descriptor).id == rule_descriptor_1.id + +def test_match_schc_packet(): + schc_packet:Buffer = Buffer(content= b'\xc0\x1a\x00\x80\x06\x85\xc2\x18\x45\x22\xf6\xf4' \ + b'\x0b\x83\x00\xef\xee\x66\x29\x12\x21\x86\xe5\xb7' \ + b'\xb2\x26\x26\xe2\x23\xa2\x22\xf3\x62\xf2\x22\xc2' \ + b'\x26\xe2\x23\xa2\x23\x02\xf3\x02\x22\xc2\x27\x62' \ + b'\x23\xa3\x53\x42\xe3\x07\xd2\xc7\xb2\x26\xe2\x23' \ + b'\xa2\x23\x02\xf3\x12\x22\xc2\x27\x62\x23\xa3\x43' \ + b'\x82\xe3\x07\xd2\xc7\xb2\x26\xe2\x23\xa2\x23\x02' \ + b'\xf3\x52\x22\xc2\x27\x62\x23\xa3\x13\x63\x63\x63' \ + b'\x23\x63\x33\x33\x33\x97\xd5\xd0', + bit_length=828, padding=Padding.RIGHT) + + rule_descriptor_0: RuleDescriptor = RuleDescriptor(id=Buffer(content=b'\x02', bit_length=2), field_descriptors=[]) + rule_descriptor_1: RuleDescriptor = RuleDescriptor(id=Buffer(content=b'\x03', bit_length=2), field_descriptors=[]) + rule_descriptor_2: RuleDescriptor = RuleDescriptor(id=Buffer(content=b'\x01', bit_length=2), field_descriptors=[]) + + default_rule_descriptor: RuleDescriptor = RuleDescriptor(id=Buffer(content=b'\x00', bit_length=2), field_descriptors=[]) + + ruler: Ruler = Ruler(rules_descriptors=[rule_descriptor_0, rule_descriptor_1, rule_descriptor_2, default_rule_descriptor]) + + matching_rule_descriptor = ruler.match_schc_packet(schc_packet=schc_packet) + + assert matching_rule_descriptor.id == rule_descriptor_1.id \ No newline at end of file diff --git a/tests/test_microschc.py b/tests/test_microschc.py index fcbad1e..1cc6d44 100644 --- a/tests/test_microschc.py +++ b/tests/test_microschc.py @@ -2,4 +2,4 @@ def test_version(): - assert __version__ == '0.5.0' + assert __version__ == '0.7.0'