Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Use monkeytype to apply typehints #44

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions meterbus/core_objects.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import Enum

from typing import Tuple, Dict, Union

class MeasureUnit(Enum):
KWH = "kWh"
Expand Down Expand Up @@ -219,7 +219,7 @@ class VIFUnitEnhExt(Enum):
class VIFTable(object):
# Primary VIFs (main table), range 0x00 - 0xFF

lut = {
lut: Dict[int, Tuple[float, Union[str, MeasureUnit], Union[str, VIFUnit, VIFUnitExt, VIFUnitSecExt]]] = {
# E000 0nnn Energy Wh (0.001Wh to 10000Wh)
0x00: (1.0e-3, MeasureUnit.WH, VIFUnit.ENERGY_WH),
0x01: (1.0e-2, MeasureUnit.WH, VIFUnit.ENERGY_WH),
Expand Down
36 changes: 19 additions & 17 deletions meterbus/telegram_body.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
from .telegram_variable_data_record import TelegramVariableDataRecord
from .value_information_block import ValueInformationBlock
from .exceptions import MBusFrameDecodeError
from decimal import Decimal
from typing import Any, Dict, List, Optional, Union


class TelegramBodyPayload(object):
def __init__(self, payload=None, parent=None):
def __init__(self, payload: None=None, parent: Optional[TelegramBody]=None) -> None:
self._body = TelegramField()
if payload is not None:
self._body = TelegramField(payload)
Expand All @@ -33,17 +35,17 @@ def body(self, value):
self._body = TelegramField(value)

@property
def interpreted(self):
def interpreted(self) -> List[Dict[str, Union[Decimal, str, int]]]:
return [r.interpreted for r in self.records]

def load(self, payload):
def load(self, payload: List[Union[Any, int]]) -> None:
self.body = payload
self.parse()

def set_payload(self, payload):
self.body = payload

def parse(self):
def parse(self) -> None:
self.records = []
recordPos = 0

Expand All @@ -53,7 +55,7 @@ def parse(self):
except IndexError:
raise

def _parse_variable_data_rec(self, startPos):
def _parse_variable_data_rec(self, startPos: int) -> int:
lowerBoundary = 0
upperBoundary = 0

Expand Down Expand Up @@ -182,7 +184,7 @@ class TelegramBodyHeader(object):
CI_VARIABLE_DATA = [0x72, 0x76, 0x78]
CI_FIXED_DATA = [0x73, 0x77]

def __init__(self):
def __init__(self) -> None:
self._ci_field = TelegramField() # control information field
self._id_nr_field = TelegramField() # identification number field
self._manufacturer_field = TelegramField() # manufacturer
Expand All @@ -192,7 +194,7 @@ def __init__(self):
self._status_field = TelegramField() # status
self._sig_field = TelegramField() # signature field

def load(self, bodyHeader):
def load(self, bodyHeader: List[int]) -> None:
if len(bodyHeader) == 1:
self.ci_field = bodyHeader[0]
else:
Expand Down Expand Up @@ -220,20 +222,20 @@ def load(self, bodyHeader):
self.sig_field.parts.reverse()

@property
def id_nr(self):
def id_nr(self) -> List[int]:
"""ID number of telegram in reverse byte order"""
return self._id_nr_field[::-1]

@property
def isLSBOrder(self):
def isLSBOrder(self) -> bool:
return not (self._ci_field.parts[0] & self.MODE_BIT_MASK)

@property
def noDataHeader(self):
def noDataHeader(self) -> bool:
return (self._ci_field.parts and self._ci_field.parts[0] == 0x78)

@property
def isVariableData(self):
def isVariableData(self) -> bool:
return (self._ci_field.parts[0] in self.CI_VARIABLE_DATA)

@property
Expand Down Expand Up @@ -305,7 +307,7 @@ def sig_field(self, value):
self._sig_field = TelegramField(value)

@property
def interpreted(self):
def interpreted(self) -> Dict[str, Union[str, int]]:
if self.noDataHeader:
return {
'type': hex(self.ci_field.parts[0])
Expand All @@ -327,21 +329,21 @@ def to_JSON(self):


class TelegramBody(object):
def __init__(self):
def __init__(self) -> None:
self._bodyHeader = TelegramBodyHeader()
self._bodyPayload = TelegramBodyPayload(parent=self)
self._bodyHeaderLength = 13

@property
def isVariableData(self):
def isVariableData(self) -> bool:
return self._bodyHeader.isVariableData

@property
def isFixedData(self):
return self._bodyHeader.isFixedData

@property
def noDataHeader(self):
def noDataHeader(self) -> bool:
return (self._bodyHeader.noDataHeader)

@property
Expand Down Expand Up @@ -376,13 +378,13 @@ def more_records_follow(self):
return self._bodyPayload.more_records_follow

@property
def interpreted(self):
def interpreted(self) -> Dict[str, Union[Dict[str, Union[str, int]], List[Union[Dict[str, Union[str, int]], Dict[str, Union[Decimal, str, int]]]]]]:
return {
'header': self.bodyHeader.interpreted,
'records': self.bodyPayload.interpreted,
}

def load(self, body):
def load(self, body: List[int]) -> None:
self.bodyHeader = body[0:self.bodyHeaderLength]
self.bodyPayload.load(body[self.bodyHeaderLength:])

Expand Down
13 changes: 7 additions & 6 deletions meterbus/telegram_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from .exceptions import MBusFrameCRCError, MBusFrameDecodeError, MBusFrameEncodeError, FrameMismatch

from .defines import *
from typing import Iterator, List, Optional, Union

class TelegramControl(object):
@staticmethod
def parse(data):
def parse(data: Optional[List[int]]) -> "TelegramControl":
if data is None:
raise MBusFrameDecodeError("Data is None")

Expand All @@ -18,7 +19,7 @@ def parse(data):

return TelegramControl(data)

def __init__(self, dbuf=None):
def __init__(self, dbuf: Optional[Union[str, bytes, List[int]]]=None) -> None:
self._header = TelegramHeader()
self._body = TelegramBody()

Expand Down Expand Up @@ -113,15 +114,15 @@ def body(self):
def body(self, value):
self._body = value

def compute_crc(self):
def compute_crc(self) -> int:
return (self.header.cField.parts[0] +
self.header.aField.parts[0] +
self.body.bodyHeader.ci_field.parts[0]) % 256

def check_crc(self):
def check_crc(self) -> bool:
return self.compute_crc() == self.header.crcField.parts[0]

def __len__(self):
def __len__(self) -> int:
return (
len(self.header.startField.parts) * 2 +
len(self.header.lField.parts) * 2 +
Expand All @@ -132,7 +133,7 @@ def __len__(self):
len(self.header.stopField.parts)
)

def __iter__(self):
def __iter__(self) -> Iterator[int]:
yield self.header.startField.parts[0]
yield len(self.body.bodyHeader.ci_field.parts) + 2
yield len(self.body.bodyHeader.ci_field.parts) + 2
Expand Down
21 changes: 11 additions & 10 deletions meterbus/telegram_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

from builtins import (bytes, str, open, super, range,
zip, round, input, int, pow, object)
from typing import List, Optional, Union


class TelegramField(object):
def __init__(self, parts=None):
self._parts = []
def __init__(self, parts: Optional[Union[int, List[int]]]=None) -> None:
self._parts: List[int] = []

if parts is not None:
if isinstance(parts, str):
Expand All @@ -20,7 +21,7 @@ def __init__(self, parts=None):
self.parts += [parts]

@property
def decodeInt(self):
def decodeInt(self) -> int:
int_data = self.parts
value = 0
neg = int_data[-1] & 0x80
Expand All @@ -40,7 +41,7 @@ def decodeInt(self):
return value

@property
def decodeBCD(self):
def decodeBCD(self) -> int:
bcd_data = self.parts
val = 0

Expand All @@ -64,23 +65,23 @@ def decodeReal(self):
return struct.unpack('f', bytes(real_data))[0]

@property
def decodeManufacturer(self):
def decodeManufacturer(self) -> str:
m_id = self.decodeInt
return "{0}{1}{2}".format(
chr(((m_id >> 10) & 0x001F) + 64),
chr(((m_id >> 5) & 0x001F) + 64),
chr(((m_id) & 0x001F) + 64))

@property
def decodeASCII(self):
def decodeASCII(self) -> str:
return "".join(map(chr, reversed(self.parts)))

@property
def decodeRAW(self):
def decodeRAW(self) -> str:
return " ".join(map(lambda x: "%02X" % x, self.parts))

@property
def decodeDate(self):
def decodeDate(self) -> str:
return DateCalculator.getDate(
self.parts[0], self.parts[1], False)

Expand Down Expand Up @@ -144,8 +145,8 @@ def __str__(self):
return " ".join(
[hex(x).replace('0x', '').zfill(2) for x in self.parts])

def __getitem__(self, key):
def __getitem__(self, key: slice) -> List[int]:
return self.parts[key]

def __len__(self):
def __len__(self) -> int:
return len(self.parts)
13 changes: 7 additions & 6 deletions meterbus/telegram_header.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import simplejson as json
from .telegram_field import TelegramField
from typing import Dict, List, Union


class TelegramHeader(object):
def __init__(self):
def __init__(self) -> None:
self._startField = TelegramField([0x68])
self._lField = TelegramField([0x00])
self._cField = TelegramField()
Expand All @@ -15,11 +16,11 @@ def __init__(self):
self._headerLengthCRCStop = 8

@property
def headerLength(self):
def headerLength(self) -> int:
return self._headerLength

@property
def headerLengthCRCStop(self):
def headerLengthCRCStop(self) -> int:
return self._headerLengthCRCStop

@property
Expand Down Expand Up @@ -71,7 +72,7 @@ def stopField(self, value):
self._stopField = TelegramField(value)

@property
def interpreted(self):
def interpreted(self) -> Dict[str, str]:
return {
'start': hex(self.startField.parts[0]),
'length': hex(self.lField.parts[0]),
Expand All @@ -81,7 +82,7 @@ def interpreted(self):
'stop': hex(self.stopField.parts[0])
}

def load(self, hat):
def load(self, hat: Union[str, List[int]]) -> None:
header = hat
if isinstance(hat, str):
header = list(map(ord, hat))
Expand All @@ -102,6 +103,6 @@ def load(self, hat):
self.crcField = header[-2]
self.stopField = header[-1]

def to_JSON(self):
def to_JSON(self) -> str:
return json.dumps(self.interpreted, sort_keys=False,
indent=4, use_decimal=True)
Loading