Skip to content

Commit

Permalink
Merge pull request #5 from scs/feature/test
Browse files Browse the repository at this point in the history
Feature/test
* backend: add unit tests
    * minor improvement
* run unit tests in Github workflow
  • Loading branch information
raymar9 authored Sep 9, 2021
2 parents 5b4eb20 + 615b27a commit ad18834
Show file tree
Hide file tree
Showing 8 changed files with 462 additions and 3 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/backend-code-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,6 @@ jobs:
- name: Lint with pylint
run: |
pipenv run lint_check
- name: Run unit tests
run: |
pipenv run test
2 changes: 2 additions & 0 deletions backend/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ setuptools = "*"
stdeb = {version="*", markers="sys_platform == 'linux'"}
twine = "*"
wheel = "*"
pytest = "*"

[requires]
python_version = "3.8"
Expand All @@ -37,3 +38,4 @@ lint = "pylint smartmeter_datacollector_configurator/"
lint_check = "pylint smartmeter_datacollector_configurator/ --exit-zero"
setup = "pipenv-setup sync"
setup_check = "pipenv-setup check"
test = "pytest"
33 changes: 32 additions & 1 deletion backend/Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions backend/smartmeter_datacollector_configurator/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import List, Optional

from pydantic import BaseModel as PydanticBaseModel
from pydantic import Field
from pydantic.class_validators import validator

LOGGER_LEVEL = ["DEBUG", "INFO", "WARNING", "ERROR", "FATAL", "CRITICAL"]
Expand Down Expand Up @@ -37,7 +38,7 @@ def port_not_empty(cls, val: str):


class MqttSinkDto(BaseModel):
type = SinkType.MQTT
type = Field(SinkType.MQTT, const=True)
host: str
port: int = 1883
tls: bool = False
Expand Down Expand Up @@ -69,7 +70,7 @@ def username_password_exists(cls, val: str, values):


class LoggerSinkDto(BaseModel):
type = SinkType.LOGGER
type = Field(SinkType.LOGGER, const=True)
name: str = "DataLogger"

@validator("name")
Expand Down
Empty file added backend/tests/__init__.py
Empty file.
43 changes: 43 additions & 0 deletions backend/tests/test_authentication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import unittest.mock
from pathlib import Path

import pytest

from smartmeter_datacollector_configurator.authentication import AuthManager, SetPasswordError
from smartmeter_datacollector_configurator.dto import CredentialsDto


def test_auth_check_credentials(tmp_path: Path):
manager = AuthManager(str(tmp_path))

# password file must have been created
pwd_file = tmp_path / AuthManager.PWD_FILE_NAME
assert pwd_file.is_file()

assert manager.check_credentials(AuthManager.USERNAME, AuthManager.DEFAULT_PASSWORD)
assert not manager.check_credentials(AuthManager.USERNAME, "incorrect_pw")
assert not manager.check_credentials("invalid_username", AuthManager.DEFAULT_PASSWORD)
assert not manager.check_credentials("invalid_username", "invalid_password")


def test_auth_set_credentials(tmp_path: Path):
manager = AuthManager(str(tmp_path))
pwd_file = tmp_path / AuthManager.PWD_FILE_NAME

assert pwd_file.read_text(encoding='utf-8') == AuthManager.DEFAULT_PASSWORD

NEW_PWD = "new_password"
cred_dto = CredentialsDto(password=NEW_PWD)
manager.set_new_credentials(cred_dto)

assert pwd_file.read_text(encoding='utf-8') == NEW_PWD

assert not manager.check_credentials(AuthManager.USERNAME, AuthManager.DEFAULT_PASSWORD)
assert manager.check_credentials(AuthManager.USERNAME, NEW_PWD)


def test_auth_raise_error_if_unable_to_write():
with unittest.mock.patch('smartmeter_datacollector_configurator.authentication.open') as open_mock:
open_mock.side_effect = OSError()
with pytest.raises(SetPasswordError):
manager = AuthManager("/some/path")
172 changes: 172 additions & 0 deletions backend/tests/test_configurator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
from configparser import ConfigParser, NoOptionError
from pathlib import Path
from typing import Any, Dict

import pytest

import smartmeter_datacollector_configurator.configurator as configurator
from smartmeter_datacollector_configurator.dto import ConfigDto, MeterDto, MeterType, MqttSinkDto


@pytest.fixture
def cfg_basic() -> Dict[str, Any]:
return {
"reader0": {
"type": "lge450",
"port": "/test/port",
},
"sink0": {
"type": "mqtt",
"host": "localhost",
},
"logging": {
"default": "INFO",
}
}


@pytest.fixture
def cfg_tls(cfg_basic) -> Dict[str, Any]:
cfg_basic["sink0"]["port"] = "8883"
cfg_basic["sink0"]["tls"] = "True"
cfg_basic["sink0"]["check_hostname"] = "True"
return cfg_basic


@pytest.fixture
def cfg_with_ca(cfg_tls, tmp_path: Path) -> Dict[str, Any]:
cfg_tls["sink0"]["check_hostname"] = False
cfg_tls["sink0"]["ca_file_path"] = str(tmp_path / configurator.CA_FILE_NAME)
return cfg_tls


def test_read_valid_config_from_file(cfg_basic: Dict[str, Any], tmp_path: Path):
file_path = tmp_path / configurator.CONFIG_FILE_NAME
parser = ConfigParser()
parser.read_dict(cfg_basic)
with open(file_path, 'w', encoding="utf-8") as file:
parser.write(file, True)

dto = configurator.retrieve_config(str(tmp_path))

assert isinstance(dto, ConfigDto)
assert len(dto.meters) == 1
assert isinstance(dto.meters[0], MeterDto)
assert dto.meters[0].type == MeterType.LGE450
assert dto.meters[0].port == cfg_basic["reader0"]["port"]

assert isinstance(dto.mqtt_sink, MqttSinkDto)
assert dto.mqtt_sink.host == cfg_basic["sink0"]["host"]

assert dto.log_level == cfg_basic["logging"]["default"]


def test_read_valid_tls_config_from_file(cfg_tls: Dict[str, Any], tmp_path: Path):
file_path = tmp_path / configurator.CONFIG_FILE_NAME
parser = ConfigParser()
parser.read_dict(cfg_tls)
with open(file_path, 'w', encoding="utf-8") as file:
parser.write(file, True)

dto = configurator.retrieve_config(str(tmp_path))

assert isinstance(dto, ConfigDto)

assert isinstance(dto.mqtt_sink, MqttSinkDto)
assert dto.mqtt_sink.port == int(cfg_tls["sink0"]["port"])
assert dto.mqtt_sink.tls == bool(cfg_tls["sink0"]["tls"])
assert dto.mqtt_sink.check_hostname == bool(cfg_tls["sink0"]["check_hostname"])


def test_read_valid_config_with_ca_cert_from_file(cfg_with_ca: Dict[str, Any], tmp_path: Path):
file_path = tmp_path / configurator.CONFIG_FILE_NAME
ca_file_path = tmp_path / configurator.CA_FILE_NAME
parser = ConfigParser()
parser.read_dict(cfg_with_ca)
with open(file_path, 'w', encoding="utf-8") as file:
parser.write(file, True)
TEST_CA = "123456789123456789ABCDEF"
ca_file_path.write_text(TEST_CA, encoding='utf-8')

dto = configurator.retrieve_config(str(tmp_path))

assert isinstance(dto.mqtt_sink, MqttSinkDto)
assert dto.mqtt_sink.check_hostname == bool(cfg_with_ca["sink0"]["check_hostname"])
assert dto.mqtt_sink.ca_cert == TEST_CA


def test_retrieve_empty_config_if_file_not_found():
dto = configurator.retrieve_config("/inextistent/path/")

assert not dto.meters
assert not dto.mqtt_sink
assert not dto.logger_sink
assert dto.log_level == "WARNING"


def test_retrieve_config_ignore_inexistent_ca_cert(cfg_with_ca: Dict[str, Any], tmp_path: Path):
file_path = tmp_path / configurator.CONFIG_FILE_NAME
parser = ConfigParser()
parser.read_dict(cfg_with_ca)
with open(file_path, 'w', encoding="utf-8") as file:
parser.write(file, True)

dto = configurator.retrieve_config(str(tmp_path))

assert isinstance(dto.mqtt_sink, MqttSinkDto)
assert dto.mqtt_sink.ca_cert == None


def test_write_config_to_file(cfg_tls: Dict[str, Any], tmp_path: Path):
file_path = tmp_path / configurator.CONFIG_FILE_NAME

dto = ConfigDto.parse_obj({
"meters": [
MeterDto.parse_obj(cfg_tls["reader0"])
],
"mqtt_sink": MqttSinkDto.parse_obj(cfg_tls["sink0"]),
"log_level": cfg_tls["logging"]["default"]
})

configurator.write_config_from_dto(str(tmp_path), dto)

parser = ConfigParser()
assert parser.read(file_path)[0] == str(file_path)

assert parser.has_section("reader0")
meter = parser["reader0"]
assert meter["type"] == cfg_tls["reader0"]["type"]
assert meter["port"] == cfg_tls["reader0"]["port"]
assert meter.get("key") == None

assert parser.has_section("sink0")
sink = parser["sink0"]
assert sink["type"] == cfg_tls["sink0"]["type"]
assert sink["tls"] == cfg_tls["sink0"]["tls"]
assert sink.get("ca_file_path") == None


def test_write_config_with_ca_cert_to_file(cfg_with_ca: Dict[str, Any], tmp_path: Path):
file_path = tmp_path / configurator.CONFIG_FILE_NAME
ca_file_path = tmp_path / configurator.CA_FILE_NAME
TEST_CA = "123456789123456789ABCDEF"

dto = ConfigDto.parse_obj({
"mqtt_sink": MqttSinkDto.parse_obj({
"host": cfg_with_ca['sink0']["host"],
"tls": True,
"ca_cert": TEST_CA,
}),
})

configurator.write_config_from_dto(str(tmp_path), dto)

parser = ConfigParser()
parser.read(file_path)

with pytest.raises(NoOptionError):
parser.get("sink0", "ca_cert")

assert parser.get("sink0", "ca_file_path") == str(tmp_path / configurator.CA_FILE_NAME)

assert ca_file_path.read_text(encoding='utf-8') == TEST_CA
Loading

0 comments on commit ad18834

Please sign in to comment.