diff --git a/zigpy_zigate/api.py b/zigpy_zigate/api.py index 178b065..2b333a8 100644 --- a/zigpy_zigate/api.py +++ b/zigpy_zigate/api.py @@ -8,6 +8,7 @@ import serial import zigpy.exceptions +import zigpy.types import zigpy_zigate.config import zigpy_zigate.uart @@ -39,6 +40,7 @@ class CommandId(enum.IntEnum): MANAGEMENT_NETWORK_UPDATE_REQUEST = 0x004A SEND_RAW_APS_DATA_PACKET = 0x0530 AHI_SET_TX_POWER = 0x0806 + GET_NETWORK_KEY = 0x0054 class ResponseId(enum.IntEnum): @@ -65,6 +67,7 @@ class ResponseId(enum.IntEnum): APS_DATA_CONFIRM_FAILED = 0x8702 AHI_SET_TX_POWER_RSP = 0x8806 EXTENDED_ERROR = 0x9999 + GET_NETWORK_KEY_LIST = 0x8054 class SendSecurity(t.uint8_t, enum.Enum): @@ -142,6 +145,7 @@ class FactoryNewRestartStatus(t.uint8_t, enum.Enum): ), ResponseId.AHI_SET_TX_POWER_RSP: (t.uint8_t,), ResponseId.EXTENDED_ERROR: (t.Status,), + ResponseId.GET_NETWORK_KEY_LIST: (zigpy.types.KeyData,), } COMMANDS = { @@ -212,6 +216,10 @@ class CommandError(zigpy.exceptions.APIException): pass +class CommandNotSupportedError(CommandError): + pass + + class ZiGate: def __init__(self, device_config: Dict[str, Any]): self._app = None @@ -568,6 +576,16 @@ def handle_callback(self, *args): except Exception as e: LOGGER.exception("Exception running handler", exc_info=e) + async def get_network_key(self): + rsp, _ = await self.command( + CommandId.GET_NETWORK_KEY, wait_response=ResponseId.GET_NETWORK_KEY_LIST + ) + + if rsp[0] == t.Status.UnhandledCommand: + raise CommandNotSupportedError() + + return rsp[0] + @classmethod async def probe(cls, device_config: Dict[str, Any]) -> bool: """Probe port for the device presence.""" diff --git a/zigpy_zigate/zigbee/application.py b/zigpy_zigate/zigbee/application.py index 32d8e3a..e389494 100644 --- a/zigpy_zigate/zigbee/application.py +++ b/zigpy_zigate/zigbee/application.py @@ -14,7 +14,13 @@ import zigpy.zdo from zigpy_zigate import common as c, types as t -from zigpy_zigate.api import PDM_EVENT, NoResponseError, ResponseId, ZiGate +from zigpy_zigate.api import ( + PDM_EVENT, + CommandNotSupportedError, + NoResponseError, + ResponseId, + ZiGate, +) from zigpy_zigate.config import ( CONF_DEVICE, CONF_DEVICE_PATH, @@ -91,6 +97,12 @@ async def load_network_info(self, *, load_devices: bool = False): zigpy.types.uint64_t(network_state[3]).serialize() ) + try: + network_key_data = await self._api.get_network_key() + network_key = zigpy.state.Key(key=network_key_data) + except CommandNotSupportedError: + network_key = zigpy.state.Key() + self.state.network_info = zigpy.state.NetworkInfo( source=f"zigpy-zigate@{LIB_VERSION}", extended_pan_id=epid, @@ -100,8 +112,7 @@ async def load_network_info(self, *, load_devices: bool = False): channel=network_state[4], channel_mask=zigpy.types.Channels.from_channel_list([network_state[4]]), security_level=5, - # TODO: is it possible to read keys? - # network_key=zigpy.state.Key(), + network_key=network_key, # tc_link_key=zigpy.state.Key(), children=[], key_table=[],