diff --git a/.github/workflows/hassfest.yaml b/.github/workflows/hassfest.yaml new file mode 100644 index 0000000..100f814 --- /dev/null +++ b/.github/workflows/hassfest.yaml @@ -0,0 +1,14 @@ +name: Validate with hassfest + +on: + push: + pull_request: + # schedule: + # - cron: "0 0 * * *" + +jobs: + validate: + runs-on: "ubuntu-latest" + steps: + - uses: actions/checkout@v4 + - uses: home-assistant/actions/hassfest@master diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml new file mode 100644 index 0000000..05b8ea1 --- /dev/null +++ b/.github/workflows/tests.yaml @@ -0,0 +1,32 @@ +name: Python tests + +on: + workflow_dispatch: + pull_request: + paths: + - '**.py' + push: + paths: + - '**.py' + +jobs: + tests: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ['3.12'] + + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.test.txt + - name: Full test with pytest + run: pytest --cov=. --cov-config=.coveragerc --cov-report xml:coverage.xml \ No newline at end of file diff --git a/.gitignore b/.gitignore index 62c8935..8eddca8 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,8 @@ -.idea/ \ No newline at end of file +.idea/ +.storage/ +blueprints/ +www/ +.HA_VERSION +__pycache__ +home-assistant* +/*.yaml \ No newline at end of file diff --git a/README.md b/README.md index 91ddc6f..bc9c2a5 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,10 @@ Apart from potentially saving some money, this kind of temporal shifting of cons ### Configuration -1. Add the following to your `configuration.yaml` file: +> **IMPORTANT NOTE**: With version 2 configuration via `configuration.yaml` is no longer possible. Converting that configuration via "import config" is still not implemented. You wil have to remove the old now broken manual configuration and create a new via the GUI. + + + ## Optional parameters -There are some optional parameters that could be provided to the sensor, they can be grouped in some categories. + `accept_cost` specifies a price level in the currency of your `nordpool_entity`, that if an "average over a duration" is below this value, it is accepted and used. Even if not the lowest in the range specified. @@ -86,7 +89,7 @@ The planner types has some additional configuration variables ### Moving -Optional parameter `var_search_length_entity` (""). Default value in parenthesis. + `search_length` can be in the range of 2 to 24 and specifies how many hours ahead to serach for lowest price. @@ -113,7 +116,7 @@ The integration will use minimum of `var_search_length_entity` (if supplied and > **WORK IN PROGRESS**: This version of entity is still not fully tested, may need some more work to work properly. -Optional parameters `var_end_hour_entity` ("") and `split_hours` (false). Default values in parenthesis. + `end_hour` can be in the range of 0 to 23 and specifies at what time within 24 hours the ammount of active hours shall be selected. diff --git a/custom_components/nordpool_planner/__init__.py b/custom_components/nordpool_planner/__init__.py index 32a2f1b..247353e 100644 --- a/custom_components/nordpool_planner/__init__.py +++ b/custom_components/nordpool_planner/__init__.py @@ -1,26 +1,551 @@ +"""Main package for planner.""" + from __future__ import annotations -from homeassistant.config_entries import ConfigEntry -from homeassistant.core import Config, HomeAssistant +import datetime as dt +import logging -DOMAIN = "nordpool_planner" -PLATFORMS = ["binary_sensor"] +from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry +from homeassistant.const import STATE_UNAVAILABLE, STATE_UNKNOWN, Platform +from homeassistant.core import HomeAssistant +from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo +from homeassistant.helpers.entity import Entity +from homeassistant.helpers.event import async_track_state_change_event +from homeassistant.util import dt as dt_util +from .const import ( + CONF_ACCEPT_COST_ENTITY, + CONF_ACCEPT_RATE_ENTITY, + CONF_DURATION_ENTITY, + CONF_END_TIME_ENTITY, + CONF_HIGH_COST_ENTITY, + CONF_LOW_COST_ENTITY, + CONF_NP_ENTITY, + CONF_SEARCH_LENGTH_ENTITY, + CONF_TYPE, + CONF_TYPE_MOVING, + CONF_TYPE_STATIC, + DOMAIN, +) -async def async_setup(hass: HomeAssistant, config: Config) -> bool: - hass.data.setdefault(DOMAIN, {}) - return True +_LOGGER = logging.getLogger(__name__) + +PLATFORMS = [Platform.BINARY_SENSOR, Platform.NUMBER] + + +# async def async_setup(hass: HomeAssistant, config: Config) -> bool: +# hass.data.setdefault(DOMAIN, {}) +# return True -async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: - entry_data = dict(entry.data) - hass.data[DOMAIN][entry.entry_id] = entry_data - await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) +# async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: +# entry_data = dict(entry.data) +# hass.data[DOMAIN][entry.entry_id] = entry_data +# await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) +# return True +async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool: + """Set up this integration using UI.""" + config_entry.async_on_unload(config_entry.add_update_listener(async_reload_entry)) + + if DOMAIN not in hass.data: + hass.data[DOMAIN] = {} + + if config_entry.entry_id not in hass.data[DOMAIN]: + hass.data[DOMAIN][config_entry.entry_id] = NordpoolPlanner(hass, config_entry) + # else: + # planner = hass.data[DOMAIN][config_entry.entry_id] + # await planner.async_config_entry_first_refresh() + + if config_entry is not None: + if config_entry.source == SOURCE_IMPORT: + hass.async_create_task( + hass.config_entries.async_remove(config_entry.entry_id) + ) + return False + + await hass.config_entries.async_forward_entry_setups(config_entry, PLATFORMS) return True +# async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: +# unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) +# if unload_ok: +# hass.data[DOMAIN].pop(entry.entry_id) +# return unload_ok async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Unloading a config_flow entry.""" unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) if unload_ok: hass.data[DOMAIN].pop(entry.entry_id) return unload_ok + + +async def async_reload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> None: + """Reload the config entry.""" + await async_unload_entry(hass, config_entry) + await async_setup_entry(hass, config_entry) + + +class NordpoolPlanner: + """Planner base class.""" + + def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry) -> None: + """Initialize my coordinator.""" + self._hass = hass + self._config = config_entry + + # if self._np_entity.unique_id is not None: + # self.async_on_remove( + # async_track_state_change_event( + # self._hass, + # [self._np_entity.unique_id], + # self._async_input_changed, + # ) + # ) + + # Internal states + self._np_entity = NordpoolEntity(self._config.data[CONF_NP_ENTITY]) + + # # TODO: Dont seem to work as expected! + # async_track_state_change_event( + # self._hass, + # [self._np_entity.unique_id], + # self._async_input_changed, + # ) + + # Configuration entities + self._duration_number_entity = "" + self._accept_cost_number_entity = "" + self._accept_rate_number_entity = "" + self._search_length_number_entity = "" + self._end_time_number_entity = "" + # TODO: Make dictionary? + + # Output entities + self._low_cost_binary_sensor_entity = None + self._high_cost_binary_sensor_entity = None + # TODO: Make list? + + # Output states + self.low_cost_state = NordpoolPlannerState() + self.high_cost_state = NordpoolPlannerState() + + @property + def name(self) -> str: + """Name of planner.""" + return self._config.data["name"] + + @property + def _duration(self) -> int: + """Get duration parameter.""" + return self.get_number_entity_value(self._duration_number_entity, integer=True) + + @property + def _is_moving(self) -> bool: + """Get if planner is of type Moving.""" + return self._config.data[CONF_TYPE] == CONF_TYPE_MOVING + + @property + def _search_length(self) -> int: + """Get search length parameter.""" + return self.get_number_entity_value( + self._search_length_number_entity, integer=True + ) + + @property + def _is_static(self) -> bool: + """Get if planner is of type Moving.""" + return self._config.data[CONF_TYPE] == CONF_TYPE_STATIC + + @property + def _end_time(self) -> int: + """Get end time parameter.""" + return self.get_number_entity_value(self._end_time_number_entity, integer=True) + + @property + def _accept_cost(self) -> float: + """Get accept cost parameter.""" + return self.get_number_entity_value(self._accept_cost_number_entity) + + @property + def _accept_rate(self) -> float: + """Get accept rate parameter.""" + return self.get_number_entity_value(self._accept_rate_number_entity) + + def get_number_entity_value( + self, entity_id: str, integer: bool = False + ) -> float | int | None: + """Get value of generic entity parameter.""" + if entity_id: + try: + entity = self._hass.states.get(entity_id) + state = entity.state + value = float(state) + if integer: + return int(value) + return value # noqa: TRY300 + except (TypeError, ValueError): + _LOGGER.warning( + 'Could not convert value "%s" of entity %s to expected format', + state, + entity_id, + ) + except Exception as e: # noqa: BLE001 + _LOGGER.error( + 'Unknown error wen reading and converting "%s": %s', + entity_id, + e, + ) + else: + _LOGGER.debug("No entity defined") + return None + + def register_input_entity_id(self, entity_id, conf_key) -> None: + """Register input entity id.""" + # Input numbers + if conf_key == CONF_DURATION_ENTITY: + self._duration_number_entity = entity_id + elif conf_key == CONF_ACCEPT_COST_ENTITY: + self._accept_cost_number_entity = entity_id + elif conf_key == CONF_ACCEPT_RATE_ENTITY: + self._accept_rate_number_entity = entity_id + elif conf_key == CONF_SEARCH_LENGTH_ENTITY: + self._search_length_number_entity = entity_id + elif conf_key == CONF_END_TIME_ENTITY: + self._end_time_number_entity = entity_id + else: + _LOGGER.warning( + 'An entity "%s" was registred for callback but no match for key "%s"', + entity_id, + conf_key, + ) + # TODO: Dont seem to work as expected! + async_track_state_change_event( + self._hass, + [entity_id], + self._async_input_changed, + ) + + def register_output_listner_entity(self, entity, conf_key="") -> None: + """Register output entity.""" + # Output binary sensors + if conf_key == CONF_LOW_COST_ENTITY: + self._low_cost_binary_sensor_entity = entity + elif conf_key == CONF_HIGH_COST_ENTITY: + self._high_cost_binary_sensor_entity = entity + else: + _LOGGER.warning( + 'An entity "%s" was registred for update but no match for key "%s"', + entity.entity_id, + conf_key, + ) + + def get_device_info(self) -> DeviceInfo: + """Get device info to group entities.""" + return DeviceInfo( + identifiers={(DOMAIN, self._config.data[CONF_TYPE])}, + name=self.name, + manufacturer="Nordpool", + entry_type=DeviceEntryType.SERVICE, + ) + + def input_changed(self, value): + """Input entitiy callback to initiate a planner update.""" + _LOGGER.debug("Sensor change event from callback: %s", value) + self.update() + + async def _async_input_changed(self, event): + """Input entity change callback from state change event.""" + new_state = event.data.get("new_state") + _LOGGER.debug("Sensor change event from HASS: %s", new_state) + self.update() + + def update(self): + """Planner update call function.""" + _LOGGER.debug("Updating planner") + + # Update inputs + if not self._np_entity.update(self._hass): + self.set_unavailable() + return + + if not self._np_entity.valid: + _LOGGER.warning("Aborting update since no valid Nordpool data") + return + + if not self._duration: + _LOGGER.warning("Aborting update since no valid Duration") + return + + if self._is_moving and not self._search_length: + _LOGGER.warning("Aborting update since no valid Search length") + return + + if self._is_static and not self._end_time: + _LOGGER.warning("Aborting update since no valid end time") + return + + # initialize local variables + now = dt_util.now() + duration = dt.timedelta(hours=self._duration - 1) + + if self._is_moving: + end_time = now + dt.timedelta(hours=self._search_length) + elif self._is_static: + end_time = now.replace(minute=0, second=0, microsecond=0) + if self._end_time < now.hour: + end_time += dt.timedelta(days=1) + + else: + _LOGGER.warning("Aborting update since unknown planner type") + return + + prices_groups: list[NordpoolPricesGroup] = [] + offset = 0 + while True: + start_offset = dt.timedelta(hours=offset) + first_time = now + start_offset + last_time = first_time + duration + if offset != 0 and last_time > end_time: + break + offset += 1 + prices_group = self._np_entity.get_prices_group(first_time, last_time) + if not prices_group.valid: + continue + # TODO: Should not end up here, why? + prices_groups.append(prices_group) + + if len(prices_groups) == 0: + _LOGGER.warning( + "Aborting update since no prices fetched in range %s to %s with duration %s", + now, + end_time, + duration, + ) + + _LOGGER.debug( + "Processing %s prices_groups found in range %s to %s", + len(prices_groups), + now, + end_time, + ) + + accept_cost = self._accept_cost + accept_rate = self._accept_rate + lowest_cost_group: NordpoolPricesGroup = prices_groups[0] + for p in prices_groups: + if accept_cost and p.average < accept_cost: + _LOGGER.debug("Accept cost fulfilled") + self.set_lowest_cost_state(p, now) + break + if accept_rate and p.average / self._np_entity.average_attr < accept_rate: + _LOGGER.debug("Accept rate fulfilled") + self.set_lowest_cost_state(p, now) + break + if p.average < lowest_cost_group.average: + lowest_cost_group = p + else: + self.set_lowest_cost_state(lowest_cost_group) + + highest_cost_group: NordpoolPricesGroup = prices_groups[0] + for p in prices_groups: + if p.average > highest_cost_group.average: + highest_cost_group = p + self.set_highest_cost_state(highest_cost_group) + + def set_lowest_cost_state(self, prices_group: NordpoolPricesGroup) -> None: + """Set the state to output variable.""" + self.low_cost_state.starts_at = prices_group.start_time + self.low_cost_state.cost_at = prices_group.average + self.low_cost_state.now_cost_rate = ( + self._np_entity.current_price_attr / prices_group.average + ) + _LOGGER.debug("Wrote lowest cost state: %s", self.low_cost_state) + if self._low_cost_binary_sensor_entity: + self._low_cost_binary_sensor_entity.update_callback() + + def set_highest_cost_state(self, prices_group: NordpoolPricesGroup) -> None: + """Set the state to output variable.""" + self.high_cost_state.starts_at = prices_group.start_time + self.high_cost_state.cost_at = prices_group.average + self.high_cost_state.now_cost_rate = ( + self._np_entity.current_price_attr / prices_group.average + ) + _LOGGER.debug("Wrote highest cost state: %s", self.high_cost_state) + if self._high_cost_binary_sensor_entity: + self._high_cost_binary_sensor_entity.update_callback() + + def set_unavailable(self) -> None: + """Set output state to unavailable.""" + self.low_cost_state.starts_at = STATE_UNAVAILABLE + self.low_cost_state.cost_at = STATE_UNAVAILABLE + self.low_cost_state.now_cost_rate = STATE_UNAVAILABLE + self.high_cost_state.starts_at = STATE_UNAVAILABLE + self.high_cost_state.cost_at = STATE_UNAVAILABLE + self.high_cost_state.now_cost_rate = STATE_UNAVAILABLE + _LOGGER.debug("Setting output states to unavailable") + if self._low_cost_binary_sensor_entity: + self._low_cost_binary_sensor_entity.update_callback() + if self._high_cost_binary_sensor_entity: + self._high_cost_binary_sensor_entity.update_callback() + + +class NordpoolEntity: + """Represenatation for Nordpool state.""" + + def __init__(self, unique_id: str) -> None: + """Initialize state tracker.""" + self._unique_id = unique_id + self._np = None + + @property + def unique_id(self) -> str: + """Get the unique id.""" + return self._unique_id + + @property + def valid(self) -> bool: + """Get if data is valid.""" + # TODO: Add more checks, make function of those in update() + return self._np is not None + + @property + def prices(self): + """Get the prices.""" + np_prices = self._np.attributes["today"] + if self._np.attributes["tomorrow_valid"]: + np_prices += self._np.attributes["tomorrow"] + return np_prices + + @property + def _prices_raw(self): + np_prices = self._np.attributes["raw_today"] + if self._np.attributes["tomorrow_valid"]: + np_prices += self._np.attributes["raw_tomorrow"] + return np_prices + + @property + def average_attr(self): + """Get the average price attribute.""" + return self._np.attributes["average"] + + @property + def current_price_attr(self): + """Get the curent price attribute.""" + return self._np.attributes["current_price"] + + def update(self, hass: HomeAssistant) -> bool: + """Update price in storage.""" + np = hass.states.get(self._unique_id) + if np is None: + _LOGGER.warning("Got empty data from Norpool entity %s ", self._unique_id) + elif "today" not in np.attributes: + _LOGGER.warning( + "No values for today in Norpool entity %s ", self._unique_id + ) + else: + _LOGGER.debug( + "Nordpool sensor %s was updated sucsessfully", self._unique_id + ) + if self._np is None: + pass + self._np = np + + if self._np is None: + return False + return True + + def get_prices_group( + self, start: dt.datetime, end: dt.datetime + ) -> NordpoolPricesGroup: + """Get a range of prices from NP given the start and end datatimes. + + Ex. If start is 7:05 and end 10:05, a list of 4 prices will be returned, + 7, 8, 9 & 10. + """ + started = False + selected = [] + for p in self._prices_raw: + if p["start"] > start - dt.timedelta(hours=1): + started = True + if p["start"] > end: + break + if started: + selected.append(p) + return NordpoolPricesGroup(selected) + + +class NordpoolPricesGroup: + """A slice if Nordpool prices with helper functions.""" + + def __init__(self, prices) -> None: + """Initialize price group.""" + self._prices = prices + + def __str__(self) -> str: + """Get string representation of class.""" + return f"start_time={self.start_time.strftime("%Y-%m-%d %H:%M")} average={self.average} len(_prices)={len(self._prices)}" + + def __repr__(self) -> str: + """Get string representation for debugging.""" + return type(self).__name__ + f" ({self.__str__()})" + + @property + def valid(self) -> bool: + """Is the price group valid.""" + if len(self._prices) == 0: + # _LOGGER.debug("None-valid price range group, len=%s", len(self._prices)) + return False + return True + + @property + def average(self) -> float: + """The average price of the price group.""" + # if not self.valid: + # _LOGGER.warning( + # "Average set to 1 for invalid price group, should not happen" + # ) + # return 1 + return sum([p["value"] for p in self._prices]) / len(self._prices) + + @property + def start_time(self) -> dt.datetime: + """The start time of first price in group.""" + # if not self.valid: + # _LOGGER.warning( + # "Start time set to None for invalid price group, should not happen" + # ) + # return None + return self._prices[0]["start"] + + +class NordpoolPlannerState: + """State attribute representation.""" + + def __init__(self) -> None: + """Initiate states.""" + self.starts_at = STATE_UNKNOWN + self.cost_at = STATE_UNKNOWN + self.now_cost_rate = STATE_UNKNOWN + + def __str__(self) -> str: + """Get string representation of class.""" + return f"start_at={self.starts_at} cost_at={self.cost_at:.2} now_cost_rate={self.now_cost_rate:.2}" + + +class NordpoolPlannerEntity(Entity): + """Base class for nordpool planner entities.""" + + def __init__( + self, + planner: NordpoolPlanner, + ) -> None: + """Initialize entity.""" + # Input configs + self._planner = planner + self._attr_device_info = planner.get_device_info() + + @property + def should_poll(self): + """No need to poll. Coordinator notifies entity of updates.""" + return False diff --git a/custom_components/nordpool_planner/binary_sensor.py b/custom_components/nordpool_planner/binary_sensor.py index e5eed5a..e23d51f 100644 --- a/custom_components/nordpool_planner/binary_sensor.py +++ b/custom_components/nordpool_planner/binary_sensor.py @@ -1,348 +1,149 @@ +"""Binary sensor definitions.""" + from __future__ import annotations import logging -from typing import Any -import voluptuous as vol -import homeassistant.helpers.config_validation as cv -from homeassistant.components.binary_sensor import PLATFORM_SCHEMA, BinarySensorEntity -from homeassistant.const import STATE_UNKNOWN -from homeassistant.core import HomeAssistant -from homeassistant.helpers.entity_platform import AddEntitiesCallback -from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType -from homeassistant.util import dt -_LOGGER = logging.getLogger(__name__) +from homeassistant.components.binary_sensor import ( + BinarySensorEntity, + BinarySensorEntityDescription, +) +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import STATE_UNAVAILABLE, STATE_UNKNOWN +from homeassistant.core import HomeAssistant +from homeassistant.util import dt as dt_util -NORDPOOL_ENTITY = "nordpool_entity" -ENTITY_ID = "entity_id" -PLANNER_TYPE = "planner_type" -MOVING = "moving" -STATIC = "static" -SEARCH_LENGTH = "search_length" -VAR_SEARCH_LENGTH_ENTITY = "var_search_length_entity" -DURATION = "duration" -VAR_DURATION_ENTITY = "var_duration_entity" -END_HOUR = "end_hour" -VAR_END_HOUR_ENTITY = "var_end_hour_entity" -SPLIT_HOURS = "split_hours" -ACCEPT_COST = "accept_cost" -ACCEPT_RATE = "accept_rate" -TYPE_GROUP = "type" -TYPE_DUPLICATE_MSG = f'One entity can only be one of "{MOVING}" and "{STATIC}", please remove either or split in two entities.' -TYPE_MISSING_MSG = f'One of "{MOVING}" and "{STATIC}" must be spcified' +from . import NordpoolPlanner, NordpoolPlannerEntity +from .const import CONF_HIGH_COST_ENTITY, CONF_LOW_COST_ENTITY, DOMAIN +_LOGGER = logging.getLogger(__name__) -def optional_entity_id(value: Any) -> str: - """Validate Entity ID if not Empty""" - if not value: - return "" - return cv.entity_id(value) +# LOW_COST_ENTITY_DESCRIPTION = BinarySensorEntityDescription( +# key=CONF_LOW_COST_ENTITY, +# # device_class=BinarySensorDeviceClass.???, +# ) -# https://developers.home-assistant.io/docs/development_validation/ -# https://github.com/home-assistant/core/blob/dev/homeassistant/helpers/config_validation.py -PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( - { - vol.Required(NORDPOOL_ENTITY): cv.entity_id, - vol.Required(ENTITY_ID): vol.All(vol.Coerce(str)), - vol.Optional(DURATION, default=2): vol.All( - vol.Coerce(int), - vol.Range(min=1, max=5), - ), - vol.Optional(VAR_DURATION_ENTITY, default=""): optional_entity_id, - vol.Optional(ACCEPT_COST, default=0.0): vol.All( - vol.Coerce(float), vol.Range(min=-10000.0, max=10000.0) - ), - vol.Optional(ACCEPT_RATE, default=0.0): vol.All( - vol.Coerce(float), vol.Range(min=-10000.0, max=10000.0) - ), - # Moving planner exclusives - vol.Exclusive(MOVING, TYPE_GROUP, msg=TYPE_DUPLICATE_MSG): { - vol.Required(SEARCH_LENGTH): vol.All( - vol.Coerce(int), vol.Range(min=2, max=24) - ), - vol.Optional(VAR_SEARCH_LENGTH_ENTITY, default=""): optional_entity_id, - }, - # Static planner exclusive - vol.Exclusive(STATIC, TYPE_GROUP, msg=TYPE_DUPLICATE_MSG): { - vol.Required(END_HOUR): vol.All(vol.Coerce(int), vol.Range(min=0, max=23)), - vol.Optional(VAR_END_HOUR_ENTITY, default=""): optional_entity_id, - vol.Optional(SPLIT_HOURS, default=False): vol.Coerce(bool), - }, - # Check either is configured - # vol.Required( - # vol.Any(MOVING, STATIC), - # msg=TYPE_MISSING_MSG, - # ): object, - }, -) +async def async_setup_entry( + hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities +): + """Create state binary sensor entities for platform.""" + planner: NordpoolPlanner = hass.data[DOMAIN][config_entry.entry_id] + entities = [] -def setup_platform( - hass: HomeAssistant, - config: ConfigType, - add_entities: AddEntitiesCallback, - discovery_info: DiscoveryInfoType | None = None, -) -> None: - nordpool_entity = config[NORDPOOL_ENTITY] - entity_id = config[ENTITY_ID] - duration = config[DURATION] - var_duration_entity = config[VAR_DURATION_ENTITY] - accept_cost = config[ACCEPT_COST] - accept_rate = config[ACCEPT_RATE] - if MOVING in config.keys(): - search_length = config[MOVING][SEARCH_LENGTH] - var_search_length_entity = config[MOVING][VAR_SEARCH_LENGTH_ENTITY] - add_entities( - [ - NordpoolMovingPlannerSensor( - search_length=search_length, - var_search_length_entity=var_search_length_entity, - nordpool_entity=nordpool_entity, - entity_id=entity_id, - duration=duration, - var_duration_entity=var_duration_entity, - accept_cost=accept_cost, - accept_rate=accept_rate, - ) - ] + if config_entry.data.get(CONF_LOW_COST_ENTITY): + entities.append( + NordpoolPlannerBinarySensor( + planner, + entity_description=BinarySensorEntityDescription( + key=CONF_LOW_COST_ENTITY, + # device_class=BinarySensorDeviceClass.???, + ), + ) ) - if STATIC in config.keys(): - end_hour = config[STATIC][END_HOUR] - var_end_hour_entity = config[STATIC][VAR_END_HOUR_ENTITY] - split_hours = config[STATIC][SPLIT_HOURS] - add_entities( - [ - NordpoolStaticPlannerSensor( - end_hour=end_hour, - var_end_hour_entity=var_end_hour_entity, - nordpool_entity=nordpool_entity, - entity_id=entity_id, - duration=duration, - var_duration_entity=var_duration_entity, - split_hours=split_hours, - accept_cost=accept_cost, - accept_rate=accept_rate, - ) - ] + + if config_entry.data.get(CONF_HIGH_COST_ENTITY): + entities.append( + NordpoolPlannerBinarySensor( + planner, + entity_description=BinarySensorEntityDescription( + key=CONF_HIGH_COST_ENTITY, + # device_class=BinarySensorDeviceClass.???, + ), + ) ) + async_add_entities(entities) + return True -class NordpoolPlannerSensor(BinarySensorEntity): - """Base class for nordpool planner""" + +class NordpoolPlannerBinarySensor(NordpoolPlannerEntity, BinarySensorEntity): + """Binary state sensor.""" _attr_icon = "mdi:flash" def __init__( self, - nordpool_entity, - entity_id, - duration, - var_duration_entity, - accept_cost, - accept_rate, - ): - # Input configs - self._nordpool_entity = nordpool_entity - self._duration = duration - self._var_duration_entity = var_duration_entity - self._accept_cost = accept_cost - self._accept_rate = accept_rate - - # Entity identification - entity_id = entity_id.replace(" ", "_") - self._attr_name = f"nordpool_planner_{entity_id}" - self._attr_unique_id = entity_id - - # Internal state - self._np = None + planner, + entity_description: BinarySensorEntityDescription, + ) -> None: + """Initialize the entity.""" + super().__init__(planner) + self.entity_description = entity_description + self._attr_name = ( + self._planner.name + + " " + + entity_description.key.replace("_entity", "").replace("_", " ") + ) + self._attr_unique_id = ( + ("nordpool_planner_" + self._attr_name) + .lower() + .replace(".", "") + .replace(" ", "_") + ) - # Output states - self._attr_is_on = STATE_UNKNOWN - self._starts_at = STATE_UNKNOWN - self._cost_at = STATE_UNKNOWN - self._now_cost_rate = STATE_UNKNOWN + @property + def is_on(self): + """Output state.""" + state = STATE_UNKNOWN + # TODO: This can be made nicer to get value from states in dictionary in planner + if self.entity_description.key == CONF_LOW_COST_ENTITY: + if self._planner.low_cost_state.starts_at not in [ + STATE_UNKNOWN, + STATE_UNAVAILABLE, + ]: + state = self._planner.low_cost_state.starts_at < dt_util.now() + if self.entity_description.key == CONF_HIGH_COST_ENTITY: + if self._planner.high_cost_state.starts_at not in [ + STATE_UNKNOWN, + STATE_UNAVAILABLE, + ]: + state = self._planner.high_cost_state.starts_at < dt_util.now() + _LOGGER.debug( + 'Returning state "%s" of binary sensor "%s"', + state, + self.unique_id, + ) + return state @property def extra_state_attributes(self): - """Provide attributes for the entity""" - return { - "starts_at": self._starts_at, - "cost_at": self._cost_at, - "now_cost_rate": self._now_cost_rate, + """Extra state attributes.""" + state_attributes = { + "starts_at": STATE_UNKNOWN, + "cost_at": STATE_UNKNOWN, + "now_cost_rate": STATE_UNKNOWN, } - - def _update_np_prices(self): - np = self.hass.states.get(self._nordpool_entity) - if np is None: - _LOGGER.warning( - "Got empty data from Norpool entity %s ", self._nordpool_entity - ) - return - if "today" not in np.attributes.keys(): - _LOGGER.warning( - "No values for today in Norpool entity %s ", self._nordpool_entity - ) - return - self._np = np - - @property - def _np_prices(self): - np_prices = self._np.attributes["today"][:] # Use copy of list - if self._np.attributes["tomorrow_valid"]: - np_prices += self._np.attributes["tomorrow"][:] - return np_prices - - @property - def _np_average(self): - return self._np.attributes["average"] - - @property - def _np_current(self): - return self._np.attributes["current_price"] - - def _get_input_entity_or_default(self, entity_id, default): - if entity_id: - input_value = self.hass.states.get(entity_id) - if not input_value or not input_value.state[0].isdigit(): - return default - try: - input_value = int(input_value.state.split(".")[0]) - if input_value is not None: - return input_value - except TypeError: - _LOGGER.debug( - 'Could not convert value "%s" of entity %s to int', - input_value.state, - entity_id, - ) - return default - - def _update(self, start_hour, search_length: int): - # Evaluate data - now = dt.now() - min_average = self._np_current - min_start_hour = now.hour - # Only search if current is above acceptable rates and in range - if ( - now.hour >= start_hour - and min_average > self._accept_cost - and (min_average / self._np_average) > self._accept_rate - ): - duration = self._get_input_entity_or_default( - self._var_duration_entity, self._duration - ) - for i in range( - start_hour, - min(now.hour + search_length, len(self._np_prices) - duration), - ): - prince_range = self._np_prices[i : i + duration] - # Nordpool sometimes returns null prices, https://github.com/custom-components/nordpool/issues/125 - # If more than 50% is Null in selected range skip. - if len([x for x in prince_range if x is None]) * 2 > len(prince_range): - _LOGGER.debug("Skipping range at %s as to many empty", i) - continue - prince_range = [x for x in prince_range if x is not None] - average = sum(prince_range) / len(prince_range) - if average < min_average: - min_average = average - min_start_hour = i - _LOGGER.debug("New min value at %s", i) - if ( - average < self._accept_cost - or (average / self._np_average) < self._accept_rate - ): - min_average = average - min_start_hour = i - _LOGGER.debug("Found range under accept level at %s", i) - break - - # Write result to entity - if now.hour >= min_start_hour: - self._attr_is_on = True - else: - self._attr_is_on = False - - start = dt.parse_datetime( - "%s-%s-%s %s:%s" % (now.year, now.month, now.day, 0, 0) - ) - # Check if next day - if min_start_hour >= 24: - start += dt.parse_duration("1 day") - min_start_hour -= 24 - self._starts_at = "%04d-%02d-%02d %02d:%02d" % ( - start.year, - start.month, - start.day, - min_start_hour, - 0, + # TODO: This can be made nicer to get value from states in dictionary in planner + if self.entity_description.key == CONF_LOW_COST_ENTITY: + state_attributes = { + "starts_at": self._planner.low_cost_state.starts_at, + "cost_at": self._planner.low_cost_state.cost_at, + "now_cost_rate": self._planner.low_cost_state.now_cost_rate, + } + elif self.entity_description.key == CONF_HIGH_COST_ENTITY: + state_attributes = { + "starts_at": self._planner.high_cost_state.starts_at, + "cost_at": self._planner.high_cost_state.cost_at, + "now_cost_rate": self._planner.high_cost_state.now_cost_rate, + } + _LOGGER.debug( + 'Returning extra state attributes "%s" of binary sensor "%s"', + state_attributes, + self.unique_id, ) - self._cost_at = min_average - self._now_cost_rate = self._np_current / min_average - + return state_attributes -class NordpoolMovingPlannerSensor(NordpoolPlannerSensor): - """Nordpool planner with moving search length""" - - def __init__(self, search_length, var_search_length_entity, **kwds): - super().__init__(**kwds) - self._search_length = search_length - self._var_search_length_entity = var_search_length_entity - - def update(self): - """Called from Home Assistant to update entity value""" - self._update_np_prices() - if self._np is not None: - search_length = min( - self._get_input_entity_or_default( - self._var_search_length_entity, self._search_length - ), - self._search_length, - ) - self._update(dt.now().hour, search_length) - - -class NordpoolStaticPlannerSensor(NordpoolPlannerSensor): - """Nordpool planner with fixed search length end time""" - - def __init__(self, end_hour, var_end_hour_entity, split_hours, **kwds): - super().__init__(**kwds) - self._end_hour = end_hour - self._var_end_hour_entity = var_end_hour_entity - self._split_hours = split_hours - - self._now_hour = dt.now().hour - self._remaining = self._get_input_entity_or_default( - self._var_duration_entity, self._duration - ) - - def update(self): - """Called from Home Assistant to update entity value""" - self._update_np_prices() - now = dt.now() - end_hour = self._get_input_entity_or_default( - self._var_end_hour_entity, self._end_hour - ) + async def async_added_to_hass(self) -> None: + """Load the last known state when added to hass.""" + await super().async_added_to_hass() + self._planner.register_output_listner_entity(self, self.entity_description.key) - # Start by checking if hour has changed - if self._now_hour != now.hour: - # Reset needed hours as end has been reached - if now.hour == end_hour: - self._remaining = self._get_input_entity_or_default( - self._var_duration_entity, self._duration - ) - # Else-if since don't want to risk to remove one directly - elif self._attr_is_on: - self._remaining -= 1 - self._now_hour = now.hour + def update_callback(self) -> None: + """Call from planner that new data avaialble.""" + self.schedule_update_ha_state() - if self._remaining == 0: - self._attr_is_on = False - self._starts_at = None - self._cost_at = None - self._now_cost_rate = None - else: - if self._np is not None: - if end_hour < now.hour: - end_hour += 24 - self._update(now.hour, end_hour - now.hour) + # async def async_update(self): + # """Called from Home Assistant to update entity value""" + # self._planner.update() diff --git a/custom_components/nordpool_planner/config_flow.py b/custom_components/nordpool_planner/config_flow.py new file mode 100644 index 0000000..c3352cd --- /dev/null +++ b/custom_components/nordpool_planner/config_flow.py @@ -0,0 +1,125 @@ +"""Config flow for PoolLab integration.""" + +from __future__ import annotations + +import logging +from typing import Any + +import voluptuous as vol + +from homeassistant import config_entries +from homeassistant.data_entry_flow import FlowResult +from homeassistant.helpers import selector + +from .const import ( + CONF_ACCEPT_COST_ENTITY, + CONF_ACCEPT_RATE_ENTITY, + CONF_CURENCY, + CONF_DURATION_ENTITY, + CONF_END_TIME_ENTITY, + CONF_HIGH_COST_ENTITY, + CONF_LOW_COST_ENTITY, + CONF_NAME, + CONF_NP_ENTITY, + CONF_SEARCH_LENGTH_ENTITY, + CONF_TYPE, + CONF_TYPE_LIST, + CONF_TYPE_MOVING, + CONF_TYPE_STATIC, + DOMAIN, +) + +_LOGGER = logging.getLogger(__name__) + + +class NordpoolPlannerConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): + """Nordpool Planner config flow.""" + + VERSION = 1 + data = None + options = None + _reauth_entry: config_entries.ConfigEntry | None = None + + async def async_step_user( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: + """Handle initial user step.""" + errors: dict[str, str] = {} + + if user_input is not None: + self.data = user_input + # Add those that are not optional + self.data[CONF_LOW_COST_ENTITY] = True + self.data[CONF_DURATION_ENTITY] = True + if self.data[CONF_TYPE] == CONF_TYPE_MOVING: + self.data[CONF_SEARCH_LENGTH_ENTITY] = True + elif self.data[CONF_TYPE] == CONF_TYPE_STATIC: + self.data[CONF_END_TIME_ENTITY] = True + + self.options = {} + np_entity = self.hass.states.get(self.data[CONF_NP_ENTITY]) + try: + self.options[CONF_CURENCY] = np_entity.attributes.get(CONF_CURENCY) + except (IndexError, KeyError): + _LOGGER.warning("Could not extract currency from Nordpool entity") + + await self.async_set_unique_id( + self.data[CONF_NAME] + + "_" + + self.data[CONF_NP_ENTITY] + + "_" + + self.data[CONF_TYPE] + ) + self._abort_if_unique_id_configured() + + _LOGGER.debug( + 'Creating entry "%s" with data "%s"', + self.unique_id, + self.data, + ) + return self.async_create_entry( + title=self.data[CONF_NAME], data=self.data, options=self.options + ) + + sensor_entities = self.hass.states.async_entity_ids(domain_filter="sensor") + selected_entities = [s for s in sensor_entities if "nordpool" in s] + # TODO: Enable usage for e-ENTSO prices integration + # selected_entities += [ + # s for s in sensor_entities if "current_electricity_market_price" in s + # ] + + if len(selected_entities) == 0: + errors["base"] = "No Nordpool entity found" + + schema = vol.Schema( + { + vol.Required(CONF_NAME): str, + vol.Required(CONF_TYPE): selector.SelectSelector( + selector.SelectSelectorConfig(options=CONF_TYPE_LIST), + ), + vol.Required(CONF_NP_ENTITY): selector.SelectSelector( + selector.SelectSelectorConfig(options=selected_entities), + ), + vol.Required(CONF_ACCEPT_COST_ENTITY, default=False): bool, + vol.Required(CONF_ACCEPT_RATE_ENTITY, default=False): bool, + vol.Required(CONF_HIGH_COST_ENTITY, default=False): bool, + } + ) + + placeholders = { + CONF_TYPE: CONF_TYPE_LIST, + CONF_NP_ENTITY: selected_entities, + } + + return self.async_show_form( + step_id="user", + data_schema=schema, + description_placeholders=placeholders, + errors=errors, + ) + + # async def async_step_import( + # self, user_input: Optional[Dict[str, Any]] | None = None + # ) -> FlowResult: + # """Import nordpool planner config from configuration.yaml.""" + # return await self.async_step_user(import_data) diff --git a/custom_components/nordpool_planner/const.py b/custom_components/nordpool_planner/const.py new file mode 100644 index 0000000..8f0c827 --- /dev/null +++ b/custom_components/nordpool_planner/const.py @@ -0,0 +1,24 @@ +"""Common constants for integration.""" + +DOMAIN = "nordpool_planner" + +CONF_NAME = "name" +CONF_TYPE = "type" +CONF_TYPE_MOVING = "moving" +CONF_TYPE_STATIC = "static" +CONF_TYPE_LIST = [CONF_TYPE_MOVING, CONF_TYPE_STATIC] +CONF_NP_ENTITY = "np_entity" +CONF_LOW_COST_ENTITY = "low_cost_entity" +CONF_HIGH_COST_ENTITY = "high_cost_entity" +# CONF_DURATION = "duration" # Keep these for future "convert from manual config" +CONF_DURATION_ENTITY = "duration_entity" +# CONF_ACCEPT_COST = "accept_cost" +CONF_ACCEPT_COST_ENTITY = "accept_cost_entity" +# CONF_ACCEPT_RATE = "accept_rate" +CONF_ACCEPT_RATE_ENTITY = "accept_rate_entity" +# CONF_SEARCH_LENGTH = "search_length" +CONF_SEARCH_LENGTH_ENTITY = "search_length_entity" +# CONF_END_TIME = "end_time" +CONF_END_TIME_ENTITY = "end_time_entity" + +CONF_CURENCY = "currency" diff --git a/custom_components/nordpool_planner/manifest.json b/custom_components/nordpool_planner/manifest.json index 80bfaae..02befe3 100644 --- a/custom_components/nordpool_planner/manifest.json +++ b/custom_components/nordpool_planner/manifest.json @@ -1,11 +1,12 @@ { "domain": "nordpool_planner", "name": "Nordpool Planner", + "codeowners": ["@dala318"], + "config_flow": true, + "dependencies": ["nordpool"], "documentation": "https://github.com/dala318/nordpool_planner", + "iot_class": "calculated", "issue_tracker": "https://github.com/dala318/nordpool_planner/issues", "requirements": [], - "dependencies": ["nordpool"], - "codeowners": ["@dala318"], - "iot_class": "calculated", - "version": "0.1.4" + "version": "2.0.0" } diff --git a/custom_components/nordpool_planner/number.py b/custom_components/nordpool_planner/number.py new file mode 100644 index 0000000..7a258c1 --- /dev/null +++ b/custom_components/nordpool_planner/number.py @@ -0,0 +1,192 @@ +"""Number definitions.""" + +from __future__ import annotations + +import logging + +from homeassistant.components.number import ( + NumberDeviceClass, + NumberEntityDescription, + RestoreNumber, +) +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import STATE_UNAVAILABLE, STATE_UNKNOWN, UnitOfTime +from homeassistant.core import HomeAssistant + +from . import ( + CONF_ACCEPT_COST_ENTITY, + CONF_ACCEPT_RATE_ENTITY, + CONF_DURATION_ENTITY, + CONF_END_TIME_ENTITY, + CONF_SEARCH_LENGTH_ENTITY, + NordpoolPlanner, + NordpoolPlannerEntity, +) +from .const import CONF_CURENCY, DOMAIN + +_LOGGER = logging.getLogger(__name__) + +DURATION_ENTITY_DESCRIPTION = NumberEntityDescription( + key=CONF_DURATION_ENTITY, + device_class=NumberDeviceClass.DURATION, + native_min_value=1, + native_max_value=8, + native_step=1, + native_unit_of_measurement=UnitOfTime.HOURS, +) +ACCEPT_COST_ENTITY_DESCRIPTION = NumberEntityDescription( + key=CONF_ACCEPT_COST_ENTITY, + device_class=NumberDeviceClass.MONETARY, + native_min_value=-20.0, + native_max_value=20.0, + native_step=0.01, +) +ACCEPT_RATE_ENTITY_DESCRIPTION = NumberEntityDescription( + key=CONF_ACCEPT_RATE_ENTITY, + device_class=NumberDeviceClass.DATA_RATE, + native_min_value=-1.0, + native_max_value=1.0, + native_step=0.1, +) +SEARCH_LENGTH_ENTITY_DESCRIPTION = NumberEntityDescription( + key=CONF_SEARCH_LENGTH_ENTITY, + device_class=NumberDeviceClass.DURATION, + native_min_value=3, + native_max_value=12, + native_step=1, + native_unit_of_measurement=UnitOfTime.HOURS, +) +END_TIME_ENTITY_DESCRIPTION = NumberEntityDescription( + key=CONF_END_TIME_ENTITY, + device_class=NumberDeviceClass.DURATION, + native_min_value=0, + native_max_value=23, + native_step=1, + native_unit_of_measurement=UnitOfTime.HOURS, +) + + +async def async_setup_entry( + hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities +): + """Create configuration number entities for platform.""" + + planner: NordpoolPlanner = hass.data[DOMAIN][config_entry.entry_id] + entities = [] + + if config_entry.data.get(CONF_DURATION_ENTITY): + entities.append( + NordpoolPlannerNumber( + planner, + callback=planner.input_changed, + start_val=3, + entity_description=DURATION_ENTITY_DESCRIPTION, + ) + ) + + if config_entry.data.get(CONF_ACCEPT_COST_ENTITY): + entity_description = ACCEPT_COST_ENTITY_DESCRIPTION + # Override if curensy option is set + if currency := config_entry.options.get(CONF_CURENCY): + entity_description = NumberEntityDescription( + key=ACCEPT_COST_ENTITY_DESCRIPTION.key, + device_class=ACCEPT_COST_ENTITY_DESCRIPTION.device_class, + native_min_value=ACCEPT_COST_ENTITY_DESCRIPTION.native_min_value, + native_max_value=ACCEPT_COST_ENTITY_DESCRIPTION.native_max_value, + native_step=ACCEPT_COST_ENTITY_DESCRIPTION.step, + native_unit_of_measurement=currency, + ) + entities.append( + NordpoolPlannerNumber( + planner, + callback=planner.input_changed, + start_val=0.0, + entity_description=entity_description, + ) + ) + + if config_entry.data.get(CONF_ACCEPT_RATE_ENTITY): + entities.append( + NordpoolPlannerNumber( + planner, + callback=planner.input_changed, + start_val=0.1, + entity_description=ACCEPT_RATE_ENTITY_DESCRIPTION, + ) + ) + + if config_entry.data.get(CONF_SEARCH_LENGTH_ENTITY): + entities.append( + NordpoolPlannerNumber( + planner, + callback=planner.input_changed, + start_val=10, + entity_description=SEARCH_LENGTH_ENTITY_DESCRIPTION, + ) + ) + + if config_entry.data.get(CONF_END_TIME_ENTITY): + entities.append( + NordpoolPlannerNumber( + planner, + callback=planner.input_changed, + start_val=7, + entity_description=END_TIME_ENTITY_DESCRIPTION, + ) + ) + + async_add_entities(entities) + return True + + +class NordpoolPlannerNumber(NordpoolPlannerEntity, RestoreNumber): + """Number config entity.""" + + def __init__( + self, + planner, + callback, + start_val, + entity_description: NumberEntityDescription, + ) -> None: + """Initialize the entity.""" + super().__init__(planner) + self.entity_description = entity_description + self._default_value = start_val + self._callback = callback + self._attr_name = ( + self._planner.name + + " " + + entity_description.key.replace("_entity", "").replace("_", " ") + ) + self._attr_unique_id = ( + ("nordpool_planner_" + self._attr_name) + .lower() + .replace(".", "") + .replace(" ", "_") + ) + + async def async_added_to_hass(self) -> None: + """Load the last known state when added to hass.""" + await super().async_added_to_hass() + if (last_state := await self.async_get_last_state()) and ( + last_number_data := await self.async_get_last_number_data() + ): + if last_state.state not in (STATE_UNKNOWN, STATE_UNAVAILABLE): + self._attr_native_value = last_number_data.native_value + else: + self._attr_native_value = self._default_value + self._planner.register_input_entity_id( + self.entity_id, self.entity_description.key + ) + + async def async_set_native_value(self, value: float) -> None: + """Update the current value.""" + self._attr_native_value = value + _LOGGER.debug( + "Got new async value %s for %s", + value, + self.name, + ) + self._callback(value) + self.async_schedule_update_ha_state() diff --git a/custom_components/nordpool_planner/translations/en.json b/custom_components/nordpool_planner/translations/en.json new file mode 100644 index 0000000..0373a69 --- /dev/null +++ b/custom_components/nordpool_planner/translations/en.json @@ -0,0 +1,27 @@ +{ + "config": { + "step": { + "user": { + "description": "Setup a Nordpool Planner", + "data": { + "name": "Name of planner", + "type": "Planner type", + "np_entity": "Nordpool entity", + "duration_entity": "Duration: Creates dynamic configuration parameter", + "search_length_entity": "Search length: Creates dynamic configuration parameter", + "end_time_entity": "End time: Creates dynamic configuration parameter", + "accept_cost_entity": "Accept cost: Creates a configuration parameter that turn on if cost below", + "accept_rate_entity": "Accept rate: Creates a configuration parameter that turn on if rate to daily average below", + "high_cost_entity": "High cost: Creates a binary sensor that tell in it's the highest cost (inverse of normal)" + } + } + }, + "error": { + "name_exists": "Name already exists", + "invalid_template": "The template is invalid" + }, + "abort": { + "already_configured": "Already configured with the same settings or name" + } + } +} diff --git a/requirements.test.txt b/requirements.test.txt new file mode 100644 index 0000000..501fb16 --- /dev/null +++ b/requirements.test.txt @@ -0,0 +1,14 @@ +# aioresponses +# codecov +# coverage>=5.2.0,<5.3.0 +# mypy + +# fuzzywuzzy +# levenshtein +# ruff +# yamllint + +pytest +pytest-asyncio +pytest-cov +pytest-homeassistant-custom-component diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..bf1b9bd --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for Nordpool Planner.""" diff --git a/tests/test_config_flow.py b/tests/test_config_flow.py new file mode 100644 index 0000000..09bef9f --- /dev/null +++ b/tests/test_config_flow.py @@ -0,0 +1,50 @@ +"""config_flow tests.""" + +from unittest import mock + +from custom_components.nordpool_planner import config_flow +import pytest + +# from pytest_homeassistant_custom_component.async_mock import patch +import voluptuous as vol + +from custom_components.nordpool_planner.const import * +from homeassistant import config_entries +from homeassistant.helpers import selector + +NP_ENTITY_NAME = "sensor.nordpool_ent" + +SCHEMA_COPY = vol.Schema( + { + vol.Required(CONF_NAME): str, + vol.Required(CONF_TYPE): selector.SelectSelector( + selector.SelectSelectorConfig(options=CONF_TYPE_LIST), + ), + vol.Required(CONF_NP_ENTITY): selector.SelectSelector( + selector.SelectSelectorConfig(options=[NP_ENTITY_NAME]), + ), + vol.Required(CONF_ACCEPT_COST_ENTITY, default=False): bool, + vol.Required(CONF_ACCEPT_RATE_ENTITY, default=False): bool, + vol.Required(CONF_HIGH_COST_ENTITY, default=False): bool, + } +) + + +# @pytest.mark.asyncio +# async def test_flow_init(hass): +# """Test the initial flow.""" +# result = await hass.config_entries.flow.async_init( +# config_flow.DOMAIN, context={"source": "user"} +# ) + +# expected = { +# "data_schema": SCHEMA_COPY, +# # "data_schema": config_flow.DATA_SCHEMA, +# "description_placeholders": None, +# "errors": {}, +# "flow_id": mock.ANY, +# "handler": "nordpool_planner", +# "step_id": "user", +# "type": "form", +# } +# assert expected == result diff --git a/tests/test_planner.py b/tests/test_planner.py new file mode 100644 index 0000000..f00cf17 --- /dev/null +++ b/tests/test_planner.py @@ -0,0 +1,91 @@ +"""planner tests.""" + +from unittest import mock + +from custom_components.nordpool_planner import NordpoolPlanner +import pytest + +# from pytest_homeassistant_custom_component.async_mock import patch +# from pytest_homeassistant_custom_component.common import ( +# MockModule, +# MockPlatform, +# mock_integration, +# mock_platform, +# ) + +from custom_components.nordpool_planner.const import * +from homeassistant import config_entries +from homeassistant.components import sensor +from homeassistant.core import HomeAssistant +from homeassistant.helpers.entity_platform import AddEntitiesCallback + + +@pytest.mark.asyncio +async def test_planner_init(hass): + """Test the planner initialization.""" + + NAME = "My planner 1" + TYPE = "moving" + DURATION_ENT = "duration_ent" + SEARCH_LENGTH_ENT = "search_len" + NP_ENT = "sensor.np_ent" + CURRENCY = "EUR" + + # async def async_setup_entry_init( + # hass: HomeAssistant, config_entry: config_entries.ConfigEntry + # ) -> bool: + # """Set up test config entry.""" + # await hass.config_entries.async_forward_entry_setups( + # config_entry, [sensor.DOMAIN] + # ) + # return True + + # mock_integration( + # hass, + # MockModule( + # "nordpool", + # async_setup_entry=async_setup_entry_init, + # ), + # ) + + config_entry = config_entries.ConfigEntry( + data={ + CONF_NAME: NAME, + CONF_TYPE: TYPE, + CONF_NP_ENTITY: NP_ENT, + CONF_DURATION_ENTITY: DURATION_ENT, + CONF_SEARCH_LENGTH_ENTITY: SEARCH_LENGTH_ENT, + }, + options={CONF_CURENCY: CURRENCY}, + domain=DOMAIN, + version=1, + minor_version=2, + source="user", + title="Nordpool Planner", + unique_id="123456", + ) + + # # Fake nordpool sensor + # np_sensor = sensor.SensorEntity() + # np_sensor.entity_id = NP_ENT + # np_sensor._attr_device_class = sensor.SensorDeviceClass.MONETARY + + # async def async_setup_entry_platform( + # hass: HomeAssistant, + # config_entry: config_entries.ConfigEntry, + # async_add_entities: AddEntitiesCallback, + # ) -> None: + # """Set up test sensor platform via config entry.""" + # async_add_entities([np_sensor]) + + # mock_platform( + # hass, + # f"{"nordpool"}.{sensor.DOMAIN}", + # MockPlatform(async_setup_entry=async_setup_entry_platform), + # ) + + planner = NordpoolPlanner(hass, config_entry) + + assert planner.name == NAME + assert planner._is_static == False + assert planner._is_moving == True