diff --git a/zigpy_zigate/api.py b/zigpy_zigate/api.py index 6f4ab3b..2d0acf8 100644 --- a/zigpy_zigate/api.py +++ b/zigpy_zigate/api.py @@ -8,6 +8,7 @@ import serial import zigpy.exceptions +import zigpy.types import zigpy_zigate.config import zigpy_zigate.uart @@ -39,6 +40,7 @@ class CommandId(enum.IntEnum): MANAGEMENT_NETWORK_UPDATE_REQUEST = 0x004A SEND_RAW_APS_DATA_PACKET = 0x0530 AHI_SET_TX_POWER = 0x0806 + GET_NETWORK_KEY = 0x0054 class ResponseId(enum.IntEnum): @@ -65,6 +67,7 @@ class ResponseId(enum.IntEnum): APS_DATA_CONFIRM_FAILED = 0x8702 AHI_SET_TX_POWER_RSP = 0x8806 EXTENDED_ERROR = 0x9999 + GET_NETWORK_KEY_LIST = 0x8054 class SendSecurity(t.uint8_t, enum.Enum): @@ -142,6 +145,7 @@ class FactoryNewRestartStatus(t.uint8_t, enum.Enum): ), ResponseId.AHI_SET_TX_POWER_RSP: (t.uint8_t,), ResponseId.EXTENDED_ERROR: (t.Status,), + ResponseId.GET_NETWORK_KEY_LIST: (zigpy.types.KeyData,), } COMMANDS = { @@ -212,6 +216,10 @@ class CommandError(zigpy.exceptions.APIException): pass +class CommandNotSupportedError(CommandError): + pass + + class ZiGate: def __init__(self, device_config: Dict[str, Any]): self._app = None @@ -568,6 +576,16 @@ def handle_callback(self, *args): except Exception as e: LOGGER.exception("Exception running handler", exc_info=e) + async def get_network_key(self): + rsp, _ = await self.command( + CommandId.GET_NETWORK_KEY, wait_response=ResponseId.GET_NETWORK_KEY_LIST + ) + + if rsp[0] == t.Status.UnhandledCommand: + raise CommandNotSupportedError() + + return rsp[0] + @classmethod async def probe(cls, device_config: Dict[str, Any]) -> bool: """Probe port for the device presence.""" diff --git a/zigpy_zigate/zigbee/application.py b/zigpy_zigate/zigbee/application.py index b54e5b8..5339c9d 100644 --- a/zigpy_zigate/zigbee/application.py +++ b/zigpy_zigate/zigbee/application.py @@ -20,6 +20,7 @@ CONF_DEVICE_PATH, CONFIG_SCHEMA, SCHEMA_DEVICE, + CommandNotSupportedError, ) LOGGER = logging.getLogger(__name__) @@ -90,6 +91,12 @@ async def load_network_info(self, *, load_devices: bool = False): zigpy.types.uint64_t(network_state[3]).serialize() ) + try: + network_key_data = await self._api.get_network_key() + network_key = zigpy.state.Key(key=network_key_data) + except CommandNotSupportedError: + network_key = zigpy.state.Key() + self.state.network_info = zigpy.state.NetworkInfo( source=f"zigpy-zigate@{importlib.metadata.version('zigpy-zigate')}", extended_pan_id=epid, @@ -99,8 +106,7 @@ async def load_network_info(self, *, load_devices: bool = False): channel=network_state[4], channel_mask=zigpy.types.Channels.from_channel_list([network_state[4]]), security_level=5, - # TODO: is it possible to read keys? - # network_key=zigpy.state.Key(), + network_key=network_key, # tc_link_key=zigpy.state.Key(), children=[], key_table=[],