From 9463df4425bf25f1fc1741ca98baf044282cffe3 Mon Sep 17 00:00:00 2001 From: Ruben Verborgh Date: Sat, 11 Jan 2025 22:28:48 +0100 Subject: [PATCH] Emit button events. --- tests/test_niko.py | 253 +++++++++++++++++++++++++++++++++++++++ zhaquirks/niko/switch.py | 133 ++++++++++++++++++++ 2 files changed, 386 insertions(+) diff --git a/tests/test_niko.py b/tests/test_niko.py index 9eb61f2b36..414bafe62b 100644 --- a/tests/test_niko.py +++ b/tests/test_niko.py @@ -3,6 +3,7 @@ from unittest import mock import pytest +from zigpy import types as t from zigpy.zcl import foundation import zhaquirks @@ -52,6 +53,19 @@ def test_state_cluster(self, zigpy_device_from_v2_quirk, switch): 0x0002: foundation.ZCLAttributeAccess.from_str("p"), } + def test_buttons_cluster(self, zigpy_device_from_v2_quirk, switch): + """Test the buttons cluster.""" + device = zigpy_device_from_v2_quirk(*switch[0], **switch[1]) + cluster = device.endpoints[1].buttons + + assert {attr.id: attr.access for attr in cluster.AttributeDefs} == { + # Button states + 0x0001: foundation.ZCLAttributeAccess.from_str("rp"), + 0x0002: foundation.ZCLAttributeAccess.from_str("rp"), + 0x0003: foundation.ZCLAttributeAccess.from_str("rp"), + 0x0004: foundation.ZCLAttributeAccess.from_str("rp"), + } + @pytest.mark.parametrize("switch", [SWITCH_SINGLE, SWITCH_DOUBLE]) # pylint: disable=R0903 class TestConfiguration: @@ -100,3 +114,242 @@ async def test_apply_custom_configuration( assert ( calls[0].kwargs["data"] == b"\x04\x5f\x12\x02\x02\x01\x00\x18\x1e" ) + + class TestDeviceAutomationTriggers: + """Test whether all device automation triggers are present.""" + + def test_single_switch(self, zigpy_device_from_v2_quirk): + """Test device automation triggers for 552-721X1.""" + device = zigpy_device_from_v2_quirk(*SWITCH_SINGLE[0], **SWITCH_SINGLE[1]) + triggers = device.device_automation_triggers + + assert triggers == { + # Button 1 + ("remote_button_short_press", "button_1"): { + "command": "button_1_remote_button_short_press" + }, + ("remote_button_short_release", "button_1"): { + "command": "button_1_remote_button_short_release" + }, + ("remote_button_long_press", "button_1"): { + "command": "button_1_remote_button_long_press" + }, + ("remote_button_long_release", "button_1"): { + "command": "button_1_remote_button_long_release" + }, + # Button 2 + ("remote_button_short_press", "button_2"): { + "command": "button_2_remote_button_short_press" + }, + ("remote_button_short_release", "button_2"): { + "command": "button_2_remote_button_short_release" + }, + ("remote_button_long_press", "button_2"): { + "command": "button_2_remote_button_long_press" + }, + ("remote_button_long_release", "button_2"): { + "command": "button_2_remote_button_long_release" + }, + } + + def test_double_switch(self, zigpy_device_from_v2_quirk): + """Test device automation triggers for 552-721X2.""" + device = zigpy_device_from_v2_quirk(*SWITCH_DOUBLE[0], **SWITCH_DOUBLE[1]) + triggers = device.device_automation_triggers + + assert triggers == { + # Button 1 + ("remote_button_short_press", "button_1"): { + "command": "button_1_remote_button_short_press" + }, + ("remote_button_short_release", "button_1"): { + "command": "button_1_remote_button_short_release" + }, + ("remote_button_long_press", "button_1"): { + "command": "button_1_remote_button_long_press" + }, + ("remote_button_long_release", "button_1"): { + "command": "button_1_remote_button_long_release" + }, + # Button 2 + ("remote_button_short_press", "button_2"): { + "command": "button_2_remote_button_short_press" + }, + ("remote_button_short_release", "button_2"): { + "command": "button_2_remote_button_short_release" + }, + ("remote_button_long_press", "button_2"): { + "command": "button_2_remote_button_long_press" + }, + ("remote_button_long_release", "button_2"): { + "command": "button_2_remote_button_long_release" + }, + # Button 3 + ("remote_button_short_press", "button_3"): { + "command": "button_3_remote_button_short_press" + }, + ("remote_button_short_release", "button_3"): { + "command": "button_3_remote_button_short_release" + }, + ("remote_button_long_press", "button_3"): { + "command": "button_3_remote_button_long_press" + }, + ("remote_button_long_release", "button_3"): { + "command": "button_3_remote_button_long_release" + }, + # Button 4 + ("remote_button_short_press", "button_4"): { + "command": "button_4_remote_button_short_press" + }, + ("remote_button_short_release", "button_4"): { + "command": "button_4_remote_button_short_release" + }, + ("remote_button_long_press", "button_4"): { + "command": "button_4_remote_button_long_press" + }, + ("remote_button_long_release", "button_4"): { + "command": "button_4_remote_button_long_release" + }, + } + + @pytest.mark.parametrize("switch", [SWITCH_SINGLE, SWITCH_DOUBLE]) + class TestButtonState: + """Test button state and associated events.""" + + @pytest.mark.parametrize( + "case", + [ + # Button 1 + [(0x00000, {})], + [(0x00010, {0x0001})], + [(0x00040, {})], + # Button 2 + [(0x00000, {})], + [(0x00100, {0x0002})], + [(0x00400, {})], + # Button 3 + [(0x00000, {})], + [(0x01000, {0x0003})], + [(0x04000, {})], + # Button 4 + [(0x00000, {})], + [(0x10000, {0x0004})], + [(0x40000, {})], + # Mixed + [ + (0x00000, {}), + (0x44440, {}), + (0x00010, {0x0001}), + (0x00110, {0x0001, 0x0002}), + (0x00140, {0x0002}), + (0x01100, {0x0002, 0x0003}), + (0x01400, {0x0003}), + (0x11000, {0x0003, 0x0004}), + (0x14000, {0x0004}), + (0x40000, {}), + (0x00110, {0x0001, 0x0002}), + (0x00440, {}), + (0x11110, {0x0001, 0x0002, 0x0003, 0x0004}), + (0x44440, {}), + ], + ], + ) + async def test_read(self, zigpy_device_from_v2_quirk, switch, case): + """Test whether button state changes cause attribute changes in the buttons cluster.""" + device = zigpy_device_from_v2_quirk(*switch[0], **switch[1]) + + state_cluster = device.endpoints[1].niko_state + buttons_cluster = device.endpoints[1].buttons + + with mock.patch.object( + state_cluster.endpoint, "request", mock.AsyncMock() + ) as request: + request.return_value = (foundation.Status.SUCCESS, "done") + + for state, on_buttons in case: + state_cluster.update_attribute(0x0002, state) + + attrs, _ = await buttons_cluster.read_attributes( + [ + 0x0001, + 0x0002, + 0x0003, + 0x0004, + ] + ) + assert attrs[0x0001] == t.Bool(0x0001 in on_buttons) + assert attrs[0x0002] == t.Bool(0x0002 in on_buttons) + assert attrs[0x0003] == t.Bool(0x0003 in on_buttons) + assert attrs[0x0004] == t.Bool(0x0004 in on_buttons) + + @pytest.mark.parametrize( + "case", + [ + # Button 1 + [(0x00000, None, None)], + [(0x00010, "button_1", "remote_button_short_press")], + [(0x00040, "button_1", "remote_button_short_release")], + # Button 2 + [(0x00000, None, None)], + [(0x00100, "button_2", "remote_button_short_press")], + [(0x00400, "button_2", "remote_button_short_release")], + # Button 3 + [(0x00000, None, None)], + [(0x01000, "button_3", "remote_button_short_press")], + [(0x04000, "button_3", "remote_button_short_release")], + # Button 4 + [(0x00000, None, None)], + [(0x10000, "button_4", "remote_button_short_press")], + [(0x40000, "button_4", "remote_button_short_release")], + # Repeated same state + [ + (0x00000, None, None), + (0x00010, "button_1", "remote_button_short_press"), + (0x00010, None, None), + (0x00010, None, None), + (0x00000, None, None), + (0x00010, None, None), + (0x00040, "button_1", "remote_button_short_release"), + (0x00040, None, None), + ], + # Multi-button presses + [ + (0x00000, None, None), + (0x00010, "button_1", "remote_button_short_press"), + (0x00110, "button_2", "remote_button_short_press"), + (0x01110, "button_3", "remote_button_short_press"), + (0x11110, "button_4", "remote_button_short_press"), + (0x41110, "button_4", "remote_button_short_release"), + (0x04110, "button_3", "remote_button_short_release"), + (0x00410, "button_2", "remote_button_short_release"), + (0x00040, "button_1", "remote_button_short_release"), + (0x00000, None, None), + ], + ], + ) + def test_events(self, zigpy_device_from_v2_quirk, switch, case): + """Test whether button state changes cause events.""" + device = zigpy_device_from_v2_quirk(*switch[0], **switch[1]) + + state_cluster = device.endpoints[1].niko_state + + buttons_cluster = device.endpoints[1].buttons + listener = mock.MagicMock() + buttons_cluster.add_listener(listener) + + for state, button, press_type in case: + listener.reset_mock() + state_cluster.update_attribute(0x0002, state) + + # Test whether the event was emitted + if not button: + assert listener.zha_send_event.call_count == 0 + else: + assert listener.zha_send_event.call_count == 1 + assert listener.zha_send_event.call_args_list[0] == mock.call( + f"{button}_{press_type}", + { + "button": button, + "press_type": press_type, + }, + ) diff --git a/zhaquirks/niko/switch.py b/zhaquirks/niko/switch.py index f4cdc998b9..ed90d5260d 100644 --- a/zhaquirks/niko/switch.py +++ b/zhaquirks/niko/switch.py @@ -6,6 +6,21 @@ from zigpy.zcl.clusters.homeautomation import Diagnostic from zigpy.zcl.foundation import BaseAttributeDefs, ZCLAttributeDef +from zhaquirks import Bus, LocalDataCluster +from zhaquirks.const import ( + BUTTON, + BUTTON_1, + BUTTON_2, + BUTTON_3, + BUTTON_4, + COMMAND, + LONG_PRESS, + LONG_RELEASE, + PRESS_TYPE, + SHORT_PRESS, + SHORT_RELEASE, + ZHA_SEND_EVENT, +) from zhaquirks.niko import NIKO, NIKO_MFG_CODE @@ -14,10 +29,21 @@ class NikoCluster(CustomCluster): attr_config = {} + def __init__(self, *args, **kwargs): + """Initialize the cluster.""" + super().__init__(*args, **kwargs) + self.attribute_bus = self.endpoint.device.attribute_bus + self.attribute_bus.add_listener(self) + async def apply_custom_configuration(self, *args, **kwargs): """Apply custom configuration.""" await self.write_attributes(self.attr_config, manufacturer=NIKO_MFG_CODE) + def _update_attribute(self, attrid, value): + super()._update_attribute(attrid, value) + attrid = self.attributes[attrid].name + self.attribute_bus.listener_event(f"{attrid}_updated", value) + class ButtonsDefaultAction(t.enum8): """Whether pressing buttons automatically triggers the corresponding switch.""" @@ -185,9 +211,96 @@ class AttributeDefs(BaseAttributeDefs): } +class ButtonsCluster(NikoCluster, LocalDataCluster): + """Virtual cluster to manage individual button state and attributes.""" + + cluster_id = 0xFC02 + ep_attribute = "buttons" + + # pylint: disable=R0903 + class AttributeDefs(BaseAttributeDefs): + """Attributes for button configuration.""" + + # Button press state + button_1_pressed = ZCLAttributeDef( + id=0x0001, + type=t.Bool, + access="rp", + is_manufacturer_specific=True, + ) + button_2_pressed = ZCLAttributeDef( + id=0x0002, + type=t.Bool, + access="rp", + is_manufacturer_specific=True, + ) + button_3_pressed = ZCLAttributeDef( + id=0x0003, + type=t.Bool, + access="rp", + is_manufacturer_specific=True, + ) + button_4_pressed = ZCLAttributeDef( + id=0x0004, + type=t.Bool, + access="rp", + is_manufacturer_specific=True, + ) + + _VALID_ATTRIBUTES = [attr.id for attr in AttributeDefs] + + PRESSED_ATTRIBUTES = [ + AttributeDefs.button_1_pressed, + AttributeDefs.button_2_pressed, + AttributeDefs.button_3_pressed, + AttributeDefs.button_4_pressed, + ] + + def __init__(self, *args, **kwargs): + """Initialize the cluster.""" + super().__init__(*args, **kwargs) + self.presses = [0] * len(self.PRESSED_ATTRIBUTES) + for attr in self.attributes: + self.update_attribute(attr, t.Bool.false) + + def buttons_state_updated(self, state): + """Emit events based on button state changes.""" + for b, button in enumerate(self.endpoint.device.button_keys): + code = state >> (4 * b + 4) & 0xF + press = self.endpoint.device.press_types.get(code) + if press and press != self.presses[b]: + self.presses[b] = press + down = t.Bool(press in {SHORT_PRESS, LONG_PRESS}) + self._update_attribute(self.PRESSED_ATTRIBUTES[b].id, down) + self.listener_event( + ZHA_SEND_EVENT, + f"{button}_{press}", + {BUTTON: button, PRESS_TYPE: press}, + ) + + class NikoSwitch(CustomDeviceV2): """Base class for Niko Connected Switches.""" + button_keys = [ + BUTTON_1, + BUTTON_2, + BUTTON_3, + BUTTON_4, + ] + + press_types = { + 0x1: SHORT_PRESS, + 0x4: SHORT_RELEASE, + 0x2: LONG_PRESS, + 0x3: LONG_RELEASE, + } + + def __init__(self, *args, **kwargs): + """Initialize the switch.""" + self.attribute_bus = Bus() + super().__init__(*args, **kwargs) + class NikoSwitchSingle(NikoSwitch): """Niko Connected Switch 552-721X1.""" @@ -226,6 +339,26 @@ def __init__(self, device_class): # Cluster replacements self.replaces(NikoConfigCluster, endpoint_id=1) self.replaces(NikoStateCluster, endpoint_id=1) + self.adds(ButtonsCluster) + + # Button triggers + self.device_automation_triggers( + { + (press_type, button_key): {COMMAND: f"{button_key}_{press_type}"} + for press_type in device_class.press_types.values() + for button_key in device_class.button_keys[: len(device_class.buttons)] + } + ) + + # Button entities + for b, name in enumerate(device_class.buttons): + self.binary_sensor( + ButtonsCluster.PRESSED_ATTRIBUTES[b].name, + ButtonsCluster.cluster_id, + entity_type=EntityType.STANDARD, + translation_key=device_class.button_keys[b], + fallback_name=name, + ) # Configuration entities self.switch(