diff --git a/meterbus/core_objects.py b/meterbus/core_objects.py index c843df4..60ec855 100644 --- a/meterbus/core_objects.py +++ b/meterbus/core_objects.py @@ -1,5 +1,5 @@ from enum import Enum - +from typing import Tuple, Dict, Union class MeasureUnit(Enum): KWH = "kWh" @@ -219,7 +219,7 @@ class VIFUnitEnhExt(Enum): class VIFTable(object): # Primary VIFs (main table), range 0x00 - 0xFF - lut = { + lut: Dict[int, Tuple[float, Union[str, MeasureUnit], Union[str, VIFUnit, VIFUnitExt, VIFUnitSecExt]]] = { # E000 0nnn Energy Wh (0.001Wh to 10000Wh) 0x00: (1.0e-3, MeasureUnit.WH, VIFUnit.ENERGY_WH), 0x01: (1.0e-2, MeasureUnit.WH, VIFUnit.ENERGY_WH), diff --git a/meterbus/telegram_body.py b/meterbus/telegram_body.py index cf489a6..f46cd1f 100644 --- a/meterbus/telegram_body.py +++ b/meterbus/telegram_body.py @@ -5,10 +5,12 @@ from .telegram_variable_data_record import TelegramVariableDataRecord from .value_information_block import ValueInformationBlock from .exceptions import MBusFrameDecodeError +from decimal import Decimal +from typing import Any, Dict, List, Optional, Union class TelegramBodyPayload(object): - def __init__(self, payload=None, parent=None): + def __init__(self, payload: None=None, parent: Optional[TelegramBody]=None) -> None: self._body = TelegramField() if payload is not None: self._body = TelegramField(payload) @@ -33,17 +35,17 @@ def body(self, value): self._body = TelegramField(value) @property - def interpreted(self): + def interpreted(self) -> List[Dict[str, Union[Decimal, str, int]]]: return [r.interpreted for r in self.records] - def load(self, payload): + def load(self, payload: List[Union[Any, int]]) -> None: self.body = payload self.parse() def set_payload(self, payload): self.body = payload - def parse(self): + def parse(self) -> None: self.records = [] recordPos = 0 @@ -53,7 +55,7 @@ def parse(self): except IndexError: raise - def _parse_variable_data_rec(self, startPos): + def _parse_variable_data_rec(self, startPos: int) -> int: lowerBoundary = 0 upperBoundary = 0 @@ -182,7 +184,7 @@ class TelegramBodyHeader(object): CI_VARIABLE_DATA = [0x72, 0x76, 0x78] CI_FIXED_DATA = [0x73, 0x77] - def __init__(self): + def __init__(self) -> None: self._ci_field = TelegramField() # control information field self._id_nr_field = TelegramField() # identification number field self._manufacturer_field = TelegramField() # manufacturer @@ -192,7 +194,7 @@ def __init__(self): self._status_field = TelegramField() # status self._sig_field = TelegramField() # signature field - def load(self, bodyHeader): + def load(self, bodyHeader: List[int]) -> None: if len(bodyHeader) == 1: self.ci_field = bodyHeader[0] else: @@ -220,20 +222,20 @@ def load(self, bodyHeader): self.sig_field.parts.reverse() @property - def id_nr(self): + def id_nr(self) -> List[int]: """ID number of telegram in reverse byte order""" return self._id_nr_field[::-1] @property - def isLSBOrder(self): + def isLSBOrder(self) -> bool: return not (self._ci_field.parts[0] & self.MODE_BIT_MASK) @property - def noDataHeader(self): + def noDataHeader(self) -> bool: return (self._ci_field.parts and self._ci_field.parts[0] == 0x78) @property - def isVariableData(self): + def isVariableData(self) -> bool: return (self._ci_field.parts[0] in self.CI_VARIABLE_DATA) @property @@ -305,7 +307,7 @@ def sig_field(self, value): self._sig_field = TelegramField(value) @property - def interpreted(self): + def interpreted(self) -> Dict[str, Union[str, int]]: if self.noDataHeader: return { 'type': hex(self.ci_field.parts[0]) @@ -327,13 +329,13 @@ def to_JSON(self): class TelegramBody(object): - def __init__(self): + def __init__(self) -> None: self._bodyHeader = TelegramBodyHeader() self._bodyPayload = TelegramBodyPayload(parent=self) self._bodyHeaderLength = 13 @property - def isVariableData(self): + def isVariableData(self) -> bool: return self._bodyHeader.isVariableData @property @@ -341,7 +343,7 @@ def isFixedData(self): return self._bodyHeader.isFixedData @property - def noDataHeader(self): + def noDataHeader(self) -> bool: return (self._bodyHeader.noDataHeader) @property @@ -376,13 +378,13 @@ def more_records_follow(self): return self._bodyPayload.more_records_follow @property - def interpreted(self): + def interpreted(self) -> Dict[str, Union[Dict[str, Union[str, int]], List[Union[Dict[str, Union[str, int]], Dict[str, Union[Decimal, str, int]]]]]]: return { 'header': self.bodyHeader.interpreted, 'records': self.bodyPayload.interpreted, } - def load(self, body): + def load(self, body: List[int]) -> None: self.bodyHeader = body[0:self.bodyHeaderLength] self.bodyPayload.load(body[self.bodyHeaderLength:]) diff --git a/meterbus/telegram_control.py b/meterbus/telegram_control.py index 94467e2..0fd6e7b 100644 --- a/meterbus/telegram_control.py +++ b/meterbus/telegram_control.py @@ -3,10 +3,11 @@ from .exceptions import MBusFrameCRCError, MBusFrameDecodeError, MBusFrameEncodeError, FrameMismatch from .defines import * +from typing import Iterator, List, Optional, Union class TelegramControl(object): @staticmethod - def parse(data): + def parse(data: Optional[List[int]]) -> "TelegramControl": if data is None: raise MBusFrameDecodeError("Data is None") @@ -18,7 +19,7 @@ def parse(data): return TelegramControl(data) - def __init__(self, dbuf=None): + def __init__(self, dbuf: Optional[Union[str, bytes, List[int]]]=None) -> None: self._header = TelegramHeader() self._body = TelegramBody() @@ -113,15 +114,15 @@ def body(self): def body(self, value): self._body = value - def compute_crc(self): + def compute_crc(self) -> int: return (self.header.cField.parts[0] + self.header.aField.parts[0] + self.body.bodyHeader.ci_field.parts[0]) % 256 - def check_crc(self): + def check_crc(self) -> bool: return self.compute_crc() == self.header.crcField.parts[0] - def __len__(self): + def __len__(self) -> int: return ( len(self.header.startField.parts) * 2 + len(self.header.lField.parts) * 2 + @@ -132,7 +133,7 @@ def __len__(self): len(self.header.stopField.parts) ) - def __iter__(self): + def __iter__(self) -> Iterator[int]: yield self.header.startField.parts[0] yield len(self.body.bodyHeader.ci_field.parts) + 2 yield len(self.body.bodyHeader.ci_field.parts) + 2 diff --git a/meterbus/telegram_field.py b/meterbus/telegram_field.py index 014fea1..14e0c50 100644 --- a/meterbus/telegram_field.py +++ b/meterbus/telegram_field.py @@ -3,11 +3,12 @@ from builtins import (bytes, str, open, super, range, zip, round, input, int, pow, object) +from typing import List, Optional, Union class TelegramField(object): - def __init__(self, parts=None): - self._parts = [] + def __init__(self, parts: Optional[Union[int, List[int]]]=None) -> None: + self._parts: List[int] = [] if parts is not None: if isinstance(parts, str): @@ -20,7 +21,7 @@ def __init__(self, parts=None): self.parts += [parts] @property - def decodeInt(self): + def decodeInt(self) -> int: int_data = self.parts value = 0 neg = int_data[-1] & 0x80 @@ -40,7 +41,7 @@ def decodeInt(self): return value @property - def decodeBCD(self): + def decodeBCD(self) -> int: bcd_data = self.parts val = 0 @@ -64,7 +65,7 @@ def decodeReal(self): return struct.unpack('f', bytes(real_data))[0] @property - def decodeManufacturer(self): + def decodeManufacturer(self) -> str: m_id = self.decodeInt return "{0}{1}{2}".format( chr(((m_id >> 10) & 0x001F) + 64), @@ -72,15 +73,15 @@ def decodeManufacturer(self): chr(((m_id) & 0x001F) + 64)) @property - def decodeASCII(self): + def decodeASCII(self) -> str: return "".join(map(chr, reversed(self.parts))) @property - def decodeRAW(self): + def decodeRAW(self) -> str: return " ".join(map(lambda x: "%02X" % x, self.parts)) @property - def decodeDate(self): + def decodeDate(self) -> str: return DateCalculator.getDate( self.parts[0], self.parts[1], False) @@ -144,8 +145,8 @@ def __str__(self): return " ".join( [hex(x).replace('0x', '').zfill(2) for x in self.parts]) - def __getitem__(self, key): + def __getitem__(self, key: slice) -> List[int]: return self.parts[key] - def __len__(self): + def __len__(self) -> int: return len(self.parts) diff --git a/meterbus/telegram_header.py b/meterbus/telegram_header.py index f8d150f..cec66f8 100644 --- a/meterbus/telegram_header.py +++ b/meterbus/telegram_header.py @@ -1,9 +1,10 @@ import simplejson as json from .telegram_field import TelegramField +from typing import Dict, List, Union class TelegramHeader(object): - def __init__(self): + def __init__(self) -> None: self._startField = TelegramField([0x68]) self._lField = TelegramField([0x00]) self._cField = TelegramField() @@ -15,11 +16,11 @@ def __init__(self): self._headerLengthCRCStop = 8 @property - def headerLength(self): + def headerLength(self) -> int: return self._headerLength @property - def headerLengthCRCStop(self): + def headerLengthCRCStop(self) -> int: return self._headerLengthCRCStop @property @@ -71,7 +72,7 @@ def stopField(self, value): self._stopField = TelegramField(value) @property - def interpreted(self): + def interpreted(self) -> Dict[str, str]: return { 'start': hex(self.startField.parts[0]), 'length': hex(self.lField.parts[0]), @@ -81,7 +82,7 @@ def interpreted(self): 'stop': hex(self.stopField.parts[0]) } - def load(self, hat): + def load(self, hat: Union[str, List[int]]) -> None: header = hat if isinstance(hat, str): header = list(map(ord, hat)) @@ -102,6 +103,6 @@ def load(self, hat): self.crcField = header[-2] self.stopField = header[-1] - def to_JSON(self): + def to_JSON(self) -> str: return json.dumps(self.interpreted, sort_keys=False, indent=4, use_decimal=True) diff --git a/meterbus/telegram_long.py b/meterbus/telegram_long.py index 01b4979..b15ae80 100644 --- a/meterbus/telegram_long.py +++ b/meterbus/telegram_long.py @@ -4,11 +4,14 @@ from .telegram_header import TelegramHeader from .exceptions import (MBusFrameCRCError, MBusFrameDecodeError, FrameMismatch, MbusFrameLengthError) +from decimal import Decimal +from meterbus.telegram_variable_data_record import TelegramVariableDataRecord +from typing import Dict, Iterator, List, Optional, Union class TelegramLong(object): @staticmethod - def parse(data): + def parse(data: Optional[List[int]]) -> "TelegramLong": if data is None: raise MBusFrameDecodeError("Data is None") @@ -20,7 +23,7 @@ def parse(data): return TelegramLong(data) - def __init__(self, dbuf=None): + def __init__(self, dbuf: Optional[List[int]]=None) -> None: self._header = TelegramHeader() self._body = TelegramBody() @@ -92,7 +95,7 @@ def body(self, value): self._body.load(value) @property - def records(self): + def records(self) -> List[TelegramVariableDataRecord]: """Alias property for easy access to records""" return self.body.bodyPayload.records @@ -105,7 +108,7 @@ def more_records_follow(self): return False @property - def interpreted(self): + def interpreted(self) -> Dict[str, Dict[str, Union[str, Dict[str, Union[str, int]], List[Union[Dict[str, Union[str, int]], Dict[str, Union[Decimal, str, int]]]]]]]: return { 'head': self.header.interpreted, 'body': self.body.interpreted @@ -129,7 +132,7 @@ def load(self, tgr): # def parse(self): # self.body.parse() - def compute_crc(self): + def compute_crc(self) -> int: return (self.header.cField.parts[0] + self.header.aField.parts[0] + sum(self.body.bodyHeader.ci_field.parts) + @@ -142,14 +145,14 @@ def compute_crc(self): sum(self.body.bodyHeader.sig_field.parts) + sum(self.body.bodyPayload.body.parts)) % 256 - def check_crc(self): + def check_crc(self) -> bool: return self.compute_crc() == self.header.crcField.parts[0] - def to_JSON(self): + def to_JSON(self) -> str: return json.dumps(self.interpreted, sort_keys=True, indent=4, use_decimal=True) - def __len__(self): + def __len__(self) -> int: return ( len(self.header.startField.parts) * 2 + len(self.header.lField.parts) * 2 + @@ -168,7 +171,7 @@ def __len__(self): len(self.header.stopField.parts) ) - def __iter__(self): + def __iter__(self) -> Iterator[int]: self.header.lField = [ len(self.header.cField.parts) + len(self.header.aField.parts) + diff --git a/meterbus/telegram_short.py b/meterbus/telegram_short.py index 2dcb74a..13ea7c7 100644 --- a/meterbus/telegram_short.py +++ b/meterbus/telegram_short.py @@ -1,11 +1,12 @@ from .defines import * from .telegram_header import TelegramHeader from .exceptions import MBusFrameDecodeError, MBusFrameCRCError, FrameMismatch +from typing import Iterator, List, Optional, Union class TelegramShort(object): @staticmethod - def parse(data): + def parse(data: Optional[List[int]]) -> "TelegramShort": if data is None: raise MBusFrameDecodeError("Data is None") @@ -17,7 +18,7 @@ def parse(data): return TelegramShort(data) - def __init__(self, dbuf=None): + def __init__(self, dbuf: Optional[Union[str, bytes, List[int]]]=None) -> None: self._header = TelegramHeader() if dbuf is not None: tgr = dbuf @@ -48,17 +49,17 @@ def header(self): def header(self, value): self._header = value - def compute_crc(self): + def compute_crc(self) -> int: return (self.header.cField.parts[0] + self.header.aField.parts[0]) % 256 - def check_crc(self): + def check_crc(self) -> bool: return self.compute_crc() == self.header.crcField.parts[0] - def __len__(self): + def __len__(self) -> int: return 0x05 - def __iter__(self): + def __iter__(self) -> Iterator[int]: yield self._header.startField.parts[0] yield self._header.cField.parts[0] yield self._header.aField.parts[0] diff --git a/meterbus/telegram_variable_data_record.py b/meterbus/telegram_variable_data_record.py index 13dee8e..2423398 100644 --- a/meterbus/telegram_variable_data_record.py +++ b/meterbus/telegram_variable_data_record.py @@ -9,32 +9,36 @@ from .value_information_block import ValueInformationBlock from .data_information_block import DataInformationBlock -from .core_objects import VIFTable, VIFUnit, VIFUnitEnhExt, DataEncoding, MeasureUnit +from .core_objects import VIFTable, VIFUnit, VIFUnitEnhExt, DataEncoding, MeasureUnit, VIFUnitExt, VIFUnitSecExt +from typing import Any, Dict, Optional, Union, Tuple class TelegramVariableDataRecord(object): UNIT_MULTIPLIER_MASK = 0x7F # 0111 1111 EXTENSION_BIT_MASK = 0x80 # 1000 0000 - def __init__(self): + def __init__(self) -> None: self.dib = DataInformationBlock() self.vib = ValueInformationBlock() self._dataField = TelegramField() @property - def dataField(self): + def dataField(self) -> TelegramField: return self._dataField @dataField.setter - def dataField(self, value): + def dataField(self, value: TelegramField) -> None: + assert isinstance(value, TelegramField) self._dataField = value @property - def more_records_follow(self): + def more_records_follow(self) -> bool: return self.dib.more_records_follow and self.dib.is_eoud - def _parse_vifx(self): + def _parse_vifx(self) -> Tuple[Union[None, int, float], Union[None, str, MeasureUnit], Union[None, str, VIFUnit, VIFUnitExt, VIFUnitSecExt], Union[None, VIFUnitEnhExt]]: + """ + """ if len(self.vib.parts) == 0: return None, None, None, None @@ -58,16 +62,17 @@ def _parse_vifx(self): # from 0xFC # if vif & vtf_ebm: code = vife[0] & self.UNIT_MULTIPLIER_MASK - factor = 1 - if 0x70 <= code <= 0x77: - factor = pow(10.0, (vife[0] & 0x07) - 6) - elif 0x78 <= code <= 0x7B: - factor = pow(10.0, (vife[0] & 0x03) - 3) - elif code == 0x7D: - factor = 1 + def factor()-> int: + if 0x70 <= code <= 0x77: + return pow(10.0, (vife[0] & 0x07) - 6) + if 0x78 <= code <= 0x7B: + return pow(10.0, (vife[0] & 0x03) - 3) + if code == 0x7D: + return 1 + return 1 - return (factor, self.vib.customVIF.decodeASCII, + return (factor(), self.vib.customVIF.decodeASCII, VIFUnit.VARIABLE_VIF, None) # // custom VIF @@ -89,33 +94,31 @@ def _parse_vifx(self): ) @property - def unit(self): + def unit(self) -> Optional[str]: _, unit, _, _ = self._parse_vifx() if isinstance(unit, MeasureUnit): return unit.value return unit @property - def value(self): + def value(self) -> Union[str, decimal.Decimal]: value = self.parsed_value - if type(value) == str and all(ord(c) < 128 for c in value): - value = str(value) + if isinstance(value, str): + if all(ord(c) < 128 for c in value): + str(value) - elif type(value) == str: - try: + elif isinstance(value, bytes): value = value.decode('unicode_escape') - except AttributeError: - pass return value @property - def function(self): + def function(self) -> int: func = self.dib.function_type return func.value @property - def parsed_value(self): + def parsed_value(self) -> Optional[Union[str, int, decimal.Decimal]]: mult, unit, _, _ = self._parse_vifx() length, enc = self.dib.length_encoding @@ -152,20 +155,29 @@ def parsed_value(self): not all(chr(c) in string.printable for c in tdf.parts)): return tdf.decodeRAW - return { - te.ENCODING_INTEGER: lambda: int( - tdf.decodeInt * mult) if mult > 1.0 else decimal.Decimal( - tdf.decodeInt * mult), - te.ENCODING_BCD: lambda: decimal.Decimal( - tdf.decodeBCD * mult), - te.ENCODING_REAL: lambda: decimal.Decimal( - tdf.decodeReal * mult), - te.ENCODING_VARIABLE_LENGTH: lambda: tdf.decodeASCII, - te.ENCODING_NULL: lambda: None - }.get(enc, lambda: None)() + def encode() -> Union[None, str, int, decimal.Decimal]: + if enc == te.ENCODING_INTEGER: + assert mult is not None + if mult > 1.0: + # TODO: If 'mult' is for example 1.1, why should the result be rounded to an int? + return int(tdf.decodeInt * mult) + return decimal.Decimal(tdf.decodeInt * mult) + if enc == te.ENCODING_BCD: + assert mult is not None + return decimal.Decimal(tdf.decodeBCD * mult) + if enc == te.ENCODING_REAL: + assert mult is not None + return decimal.Decimal(tdf.decodeReal * mult) + if enc == te.ENCODING_VARIABLE_LENGTH: + return tdf.decodeASCII + if enc == te.ENCODING_NULL: + return None + return None + + return encode() @property - def interpreted(self): + def interpreted(self) -> Dict[str, Union[ decimal.Decimal, str, int]]: _, unit, typ, unit_enh = self._parse_vifx() storage_number, tariff, device = self.dib.parse_dife() @@ -203,6 +215,6 @@ def interpreted(self): return record - def to_JSON(self): + def to_JSON(self) -> str: return json.dumps(self.interpreted, sort_keys=True, indent=4, use_decimal=True) diff --git a/meterbus/value_information_block.py b/meterbus/value_information_block.py index 493b059..159f6cd 100644 --- a/meterbus/value_information_block.py +++ b/meterbus/value_information_block.py @@ -1,11 +1,12 @@ from .telegram_field import TelegramField +from typing import List, Optional class ValueInformationBlock(TelegramField): EXTENSION_BIT_MASK = 0x80 # 1000 0000 WITHOUT_EXTENSION_BIT_MASK = 0x7F # 0111 1111 - def __init__(self, parts=None): + def __init__(self, parts: Optional[List[int]]=None) -> None: super(ValueInformationBlock, self).__init__(parts) self._custom_vif = TelegramField() @@ -18,21 +19,21 @@ def customVIF(self, value): self._custom_vif = value @property - def has_extension_bit(self): + def has_extension_bit(self) -> bool: try: return (self.parts[-1] & self.EXTENSION_BIT_MASK) > 0 except IndexError: return False @property - def without_extension_bit(self): + def without_extension_bit(self) -> bool: try: return (self.parts[0] & self.WITHOUT_EXTENSION_BIT_MASK) == 0x7C except IndexError: return False @property - def has_lvar_bit(self): + def has_lvar_bit(self) -> bool: """returns true if first VIFE has LVAR set""" try: return (self.parts[1] & self.EXTENSION_BIT_MASK) > 0 diff --git a/meterbus/wtelegram_snd_nr.py b/meterbus/wtelegram_snd_nr.py index 8fc5990..c8acc16 100644 --- a/meterbus/wtelegram_snd_nr.py +++ b/meterbus/wtelegram_snd_nr.py @@ -6,11 +6,12 @@ from .wtelegram_header import WTelegramHeader from .wtelegram_body import WTelegramFrame from .exceptions import MBusFrameDecodeError, MBusFrameCRCError, FrameMismatch +from typing import List class WTelegramSndNr(WTelegramFrame): @staticmethod - def parse(data): + def parse(data: List[int]): try: if data[1] != 0x44: # SND-NR raise FrameMismatch() diff --git a/pyproject.toml b/pyproject.toml index 5e20f19..495ca10 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,3 +47,15 @@ exclude = ["tests", "examples"] [tool.setuptools.dynamic] version = { attr = "meterbus.__version__" } + +# +# MPY tuning - this command should succeed: mypy -p meterbus +# +[[tool.mypy.overrides]] +module = ["simplejson.*","Crypto.Cipher.*"] +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = ["yaml.*", "serial.*"] +follow_untyped_imports = true +