Skip to content

Commit

Permalink
Merge pull request #63 from scs/feature/lge570_integration
Browse files Browse the repository at this point in the history
Feature/lge570 integration
  • Loading branch information
raymar9 authored Mar 13, 2024
2 parents 72a0093 + 833234c commit 4830565
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ The following smart meters are supported (see [Wiki/Home](https://github.com/scs

* Landis+Gyr E450: \
Data pushed by smart meter over CII interface (wired M-Bus, HDLC, DLMS/COSEM).
* Landis+Gyr E570: \
Data pushed by smart meter over CII interface (wired M-Bus, HDLC, DLMS/COSEM).
* Landis+Gyr E360: \
Data pushed by smart meter over P1 interface (HDLC, DLMS/COSEM only, no DSMR).
* Iskraemeco AM550: \
Expand Down
8 changes: 8 additions & 0 deletions smartmeter_datacollector/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .smartmeter.kamstrup_han import KamstrupHAN
from .smartmeter.lge360 import LGE360
from .smartmeter.lge450 import LGE450
from .smartmeter.lge570 import LGE570
from .smartmeter.meter import Meter, MeterError


Expand All @@ -34,6 +35,13 @@ def build_meters(config: ConfigParser) -> List[Meter]:
decryption_key=meter_config.get('key'),
use_system_time=meter_config.getboolean('systemtime', False)
))
elif meter_type == "lge570":
meters.append(LGE570(
port=meter_config.get('port', "/dev/ttyUSB0"),
baudrate=meter_config.getint('baudrate', LGE570.BAUDRATE),
decryption_key=meter_config.get('key'),
use_system_time=meter_config.getboolean('systemtime', False)
))
elif meter_type == "lge360":
meters.append(LGE360(
port=meter_config.get('port', "/dev/ttyUSB0"),
Expand Down
43 changes: 43 additions & 0 deletions smartmeter_datacollector/smartmeter/lge570.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Copyright (C) 2022 Supercomputing Systems AG
# This file is part of smartmeter-datacollector.
#
# SPDX-License-Identifier: GPL-2.0-only
# See LICENSES/README.md for more information.
#
import logging
from typing import Optional

import serial

from .cosem import Cosem
from .meter import MeterError, SerialHdlcDlmsMeter
from .reader import ReaderError
from .serial_reader import SerialConfig

LOGGER = logging.getLogger("smartmeter")


class LGE570(SerialHdlcDlmsMeter):
BAUDRATE = 2400

def __init__(self, port: str,
baudrate: int = BAUDRATE,
decryption_key: Optional[str] = None,
use_system_time: bool = False) -> None:
serial_config = SerialConfig(
port=port,
baudrate=baudrate,
data_bits=serial.EIGHTBITS,
parity=serial.PARITY_EVEN,
stop_bits=serial.STOPBITS_ONE,
termination=SerialHdlcDlmsMeter.HDLC_FLAG
)
cosem = Cosem(fallback_id=port)
try:
super().__init__(serial_config, cosem, decryption_key, use_system_time)
except ReaderError as ex:
LOGGER.fatal("Unable to setup serial reader for L+G E570. '%s'", ex)
raise MeterError("Failed setting up L+G E570.") from ex

LOGGER.info("Successfully set up L+G E570 smart meter on '%s'.", port)
56 changes: 56 additions & 0 deletions tests/test_lge570.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#
# Copyright (C) 2022 Supercomputing Systems AG
# This file is part of smartmeter-datacollector.
#
# SPDX-License-Identifier: GPL-2.0-only
# See LICENSES/README.md for more information.
#
import sys
from typing import List

import pytest
from pytest_mock.plugin import MockerFixture

from smartmeter_datacollector.smartmeter.lge570 import LGE570
from smartmeter_datacollector.smartmeter.meter_data import MeterDataPointTypes

from .utils import *


@pytest.mark.skipif(sys.version_info < (3, 8), reason="Python3.7 does not support AsyncMock.")
@pytest.mark.asyncio
async def test_lge570_parse_and_provide_encrypted_data(mocker: MockerFixture,
encrypted_valid_data_lge570: List[bytes]):
observer = mocker.stub("collector_mock")
observer.mock_add_spec(['notify'])
serial_mock = mocker.patch("smartmeter_datacollector.smartmeter.meter.SerialReader",
autospec=True).return_value
meter = LGE570("/test/port", decryption_key="101112131415161718191A1B1C1D1E1F")
meter.register(observer)

def data_received():
for frame in encrypted_valid_data_lge570:
meter._data_received(frame)
serial_mock.start_and_listen.side_effect = data_received

await meter.start()

serial_mock.start_and_listen.assert_awaited_once()
observer.notify.assert_called_once()
values = observer.notify.call_args.args[0]
assert isinstance(values, list)
assert any(data.type == MeterDataPointTypes.ACTIVE_POWER_P.value for data in values)
assert any(data.type == MeterDataPointTypes.ACTIVE_POWER_N.value for data in values)
assert any(data.type == MeterDataPointTypes.REACTIVE_POWER_P.value for data in values)
assert any(data.type == MeterDataPointTypes.REACTIVE_POWER_N.value for data in values)
assert any(data.type == MeterDataPointTypes.ACTIVE_ENERGY_P.value for data in values)
assert any(data.type == MeterDataPointTypes.ACTIVE_ENERGY_N.value for data in values)
assert any(data.type == MeterDataPointTypes.REACTIVE_ENERGY_Q1.value for data in values)
assert any(data.type == MeterDataPointTypes.REACTIVE_ENERGY_Q2.value for data in values)
assert any(data.type == MeterDataPointTypes.REACTIVE_ENERGY_Q3.value for data in values)
assert any(data.type == MeterDataPointTypes.REACTIVE_ENERGY_Q4.value for data in values)
assert any(data.type == MeterDataPointTypes.CURRENT_L1.value for data in values)
assert any(data.type == MeterDataPointTypes.CURRENT_L2.value for data in values)
assert any(data.type == MeterDataPointTypes.CURRENT_L3.value for data in values)
assert any(data.type == MeterDataPointTypes.POWER_FACTOR.value for data in values)
assert all(data.source == "LGZ1030769231253" for data in values)
12 changes: 12 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ def unencrypted_valid_data_iskra() -> List[bytes]:
return data


@pytest.fixture
def encrypted_valid_data_lge570() -> List[bytes]:
data_str: List[str] = []
data_str.append("7E A0 88 CE 9B 03 13 0E 9A E6 E7 00 E0 40 00 01 00 00 74 DB 08 4C 47 5A 67 74 20 62 95 82 01 FF 20 00 00 24 76 2F 13 88 34 08 13 B4 C9 27 EC 19 C3 9F A9 36 B6 F7 A3 FE 6F 63 CE 15 5C 0B E7 58 5D 3E AE 76 2C 0E 39 1A E4 AC BA 9D 9C 24 00 F8 72 57 CA 7C 28 46 02 71 36 FF 19 77 8B AB AC 6B 0C 56 80 21 EB 2E E5 DF DA 3C 39 BC 28 09 2D 6C 3B 32 E8 D3 3B 63 EC E8 93 D9 C3 D4 23 A4 42 00 D9 DB 27 10 0E 9B 32 E7 C2 7E")
data_str.append("7E A0 85 CE 9B 03 13 7A E6 E0 40 00 02 00 00 74 23 D8 DA F9 65 DC DC 8F 3A 42 CD D8 2C D3 1E FF B9 89 FF 60 7C E6 4D 8B 34 85 7A B2 86 3E 80 49 C8 E2 AC A0 59 86 ED 6C D5 3A 83 71 CC 85 01 1D 4E 63 24 51 31 C6 76 07 F2 5A 62 39 B0 58 46 B2 7B 79 00 70 33 A3 97 AB 21 A6 5B 34 30 8C 53 58 B9 D9 22 4A E4 A1 17 9D B1 77 13 B3 2C 2D 42 3A 0C 87 60 4F 0A F9 DA DE AF F6 DE E1 C1 4E 0C AF 8A 53 A3 B7 88 DA 7E")
data_str.append("7E A0 85 CE 9B 03 13 7A E6 E0 40 00 03 00 00 74 2B CF 7B 29 B7 D2 E5 71 FF F8 BC 97 C9 83 A8 E0 6A C7 C1 F4 2D C7 C5 A8 90 E4 24 C6 55 C5 68 8C 89 F5 B3 47 D5 D3 87 FF 3A 37 58 AF F8 D4 FD 2E 6E 23 BC 92 B3 18 EB 26 69 16 AF 1E 03 F4 DC B6 E1 59 AB 46 8C C4 7A 27 34 E0 A5 5E B3 26 F3 EF 3B 45 83 B3 13 8E 72 D3 FD BA 36 EB CC 35 C7 37 5B A8 5E 67 BB C9 50 ED 07 7E B3 83 F9 8C 71 1C 14 64 AC 23 22 C1 7E")
data_str.append("7E A0 85 CE 9B 03 13 7A E6 E0 40 00 04 00 00 74 D6 61 A7 B9 C0 68 C6 75 D0 43 A5 A1 0A 63 F9 7A 38 D1 8D BF 5C 0C 5E 1E E0 10 7B F5 04 89 3E 59 B9 05 47 92 29 13 D4 FE BC 0D 49 28 C0 16 AB 3B BA 94 5E 88 AF 9C 0F B0 4B D0 57 FB 9C D8 10 8E DE 20 F9 F5 A9 5A 2E 65 14 BF 93 AE 0C 8E EA F3 70 39 9A 8D EF 06 52 A4 77 C5 C6 48 2B 4C 99 16 37 28 ED 40 61 F9 3B F5 6E 3F BE 6E AF 05 09 EE 6E 0A 05 6B 17 0C 7E")
data_str.append("7E A0 4D CE 9B 03 13 2D F7 E0 C0 00 05 00 00 3C 41 AE 08 9D A2 FC BA 2E E3 CF 47 2D C8 E3 B5 E6 DB BA BF C5 20 ED E1 F1 3C 54 13 EC 0D A8 B0 D3 96 D7 35 4B 5C 7E E8 2F 9C BA 12 5F 80 B9 C9 38 0C F5 DD 9D 4E 16 48 C3 35 01 FE F3 6E 0F 7E")
data = list(map(lambda frag: bytes.fromhex(frag.replace(" ", "")), data_str))
return data


@pytest.fixture
def encrypted_data_no_pushlist_lg() -> List[bytes]:
data_str: List[str] = []
Expand Down

0 comments on commit 4830565

Please sign in to comment.