From 36601ba64577e72bc1391b3dbc3536e7e07c11c2 Mon Sep 17 00:00:00 2001 From: Rob Berwick Date: Sat, 30 Nov 2024 13:42:41 +0000 Subject: [PATCH 1/5] refactor: rename backend methods for clarity Rename `find_blinksticks` to `get_attached_blinkstick_devices`, and `_refresh_device` to `_refresh_attached_blinkstick_device` --- src/blinkstick/backends/base.py | 4 ++-- src/blinkstick/backends/unix_like.py | 12 +++++++----- src/blinkstick/backends/win32.py | 10 ++++++---- src/blinkstick/blinkstick.py | 4 ++-- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/src/blinkstick/backends/base.py b/src/blinkstick/backends/base.py index f85fa20..07d4529 100644 --- a/src/blinkstick/backends/base.py +++ b/src/blinkstick/backends/base.py @@ -15,12 +15,12 @@ def __init__(self): self.serial = None @abstractmethod - def _refresh_device(self): + def _refresh_attached_blinkstick_device(self): raise NotImplementedError @staticmethod @abstractmethod - def find_blinksticks(find_all: bool = True) -> list[T] | None: + def get_attached_blinkstick_devices(find_all: bool = True) -> list[T] | None: raise NotImplementedError @staticmethod diff --git a/src/blinkstick/backends/unix_like.py b/src/blinkstick/backends/unix_like.py index e593184..039e2eb 100644 --- a/src/blinkstick/backends/unix_like.py +++ b/src/blinkstick/backends/unix_like.py @@ -30,7 +30,7 @@ def open_device(self) -> None: except usb.core.USBError as e: raise BlinkStickException("Could not detach kernel driver: %s" % str(e)) - def _refresh_device(self): + def _refresh_attached_blinkstick_device(self): if not self.serial: return False if devices := self.find_by_serial(self.serial): @@ -39,14 +39,16 @@ def _refresh_device(self): return True @staticmethod - def find_blinksticks(find_all: bool = True) -> list[usb.core.Device] | None: + def get_attached_blinkstick_devices( + find_all: bool = True, + ) -> list[usb.core.Device] | None: return usb.core.find( find_all=find_all, idVendor=VENDOR_ID, idProduct=PRODUCT_ID ) @staticmethod def find_by_serial(serial: str) -> list[usb.core.Device] | None: - found_devices = UnixLikeBackend.find_blinksticks() or [] + found_devices = UnixLikeBackend.get_attached_blinkstick_devices() or [] for d in found_devices: try: if usb.util.get_string(d, 3, 1033) == serial: @@ -73,7 +75,7 @@ def control_transfer( # Could not communicate with BlinkStick backend # attempt to find it again based on serial - if self._refresh_device(): + if self._refresh_attached_blinkstick_device(): return self.device.ctrl_transfer( bmRequestType, bRequest, wValue, wIndex, data_or_wLength ) @@ -103,7 +105,7 @@ def _usb_get_string(self, index: int) -> str: # Could not communicate with BlinkStick backend # attempt to find it again based on serial - if self._refresh_device(): + if self._refresh_attached_blinkstick_device(): return str(usb.util.get_string(self.device, index, 1033)) else: raise BlinkStickException( diff --git a/src/blinkstick/backends/win32.py b/src/blinkstick/backends/win32.py index 86a7ca4..05a0079 100644 --- a/src/blinkstick/backends/win32.py +++ b/src/blinkstick/backends/win32.py @@ -25,7 +25,7 @@ def __init__(self, device=None): @staticmethod def find_by_serial(serial: str) -> list[hid.HidDevice] | None: - found_devices = Win32Backend.find_blinksticks() or [] + found_devices = Win32Backend.get_attached_blinkstick_devices() or [] devices = [d for d in found_devices if d.serial_number == serial] if len(devices) > 0: @@ -33,7 +33,7 @@ def find_by_serial(serial: str) -> list[hid.HidDevice] | None: return None - def _refresh_device(self): + def _refresh_attached_blinkstick_device(self): # TODO This is weird semantics. fix up return values to be more sensible if not self.serial: return False @@ -44,7 +44,9 @@ def _refresh_device(self): return True @staticmethod - def find_blinksticks(find_all: bool = True) -> list[hid.HidDevice] | None: + def get_attached_blinkstick_devices( + find_all: bool = True, + ) -> list[hid.HidDevice] | None: devices = hid.HidDeviceFilter( vendor_id=VENDOR_ID, product_id=PRODUCT_ID ).get_devices() @@ -69,7 +71,7 @@ def control_transfer( ) data[0] = wValue if not self.device.send_feature_report(data): - if self._refresh_device(): + if self._refresh_attached_blinkstick_device(): self.device.send_feature_report(data) else: raise BlinkStickException( diff --git a/src/blinkstick/blinkstick.py b/src/blinkstick/blinkstick.py index 76b036f..d628d46 100644 --- a/src/blinkstick/blinkstick.py +++ b/src/blinkstick/blinkstick.py @@ -1403,7 +1403,7 @@ def find_all() -> list[BlinkStick]: @return: a list of BlinkStick objects or None if no devices found """ result: list[BlinkStick] = [] - if (found_devices := USBBackend.find_blinksticks()) is None: + if (found_devices := USBBackend.get_attached_blinkstick_devices()) is None: return result for d in found_devices: result.extend([BlinkStick(device=d)]) @@ -1418,7 +1418,7 @@ def find_first() -> BlinkStick | None: @rtype: BlinkStick @return: BlinkStick object or None if no devices are found """ - d = USBBackend.find_blinksticks(find_all=False) + d = USBBackend.get_attached_blinkstick_devices(find_all=False) if d: return BlinkStick(device=d) From 9bb11f4f70ecab08859932c450da9436d7fa20cf Mon Sep 17 00:00:00 2001 From: Rob Berwick Date: Sat, 30 Nov 2024 14:52:36 +0000 Subject: [PATCH 2/5] refactor: tidy up semantics of `get_attached_blinkstick_devices` Always return a list from the usb backend, so if no blinkstick devices are detected, the list should be empty --- src/blinkstick/backends/base.py | 2 +- src/blinkstick/backends/unix_like.py | 9 +++++---- src/blinkstick/backends/win32.py | 8 +++----- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/blinkstick/backends/base.py b/src/blinkstick/backends/base.py index 07d4529..0b90ef5 100644 --- a/src/blinkstick/backends/base.py +++ b/src/blinkstick/backends/base.py @@ -20,7 +20,7 @@ def _refresh_attached_blinkstick_device(self): @staticmethod @abstractmethod - def get_attached_blinkstick_devices(find_all: bool = True) -> list[T] | None: + def get_attached_blinkstick_devices(find_all: bool = True) -> list[T]: raise NotImplementedError @staticmethod diff --git a/src/blinkstick/backends/unix_like.py b/src/blinkstick/backends/unix_like.py index 039e2eb..335bc83 100644 --- a/src/blinkstick/backends/unix_like.py +++ b/src/blinkstick/backends/unix_like.py @@ -41,14 +41,15 @@ def _refresh_attached_blinkstick_device(self): @staticmethod def get_attached_blinkstick_devices( find_all: bool = True, - ) -> list[usb.core.Device] | None: - return usb.core.find( - find_all=find_all, idVendor=VENDOR_ID, idProduct=PRODUCT_ID + ) -> list[usb.core.Device]: + return ( + usb.core.find(find_all=find_all, idVendor=VENDOR_ID, idProduct=PRODUCT_ID) + or [] ) @staticmethod def find_by_serial(serial: str) -> list[usb.core.Device] | None: - found_devices = UnixLikeBackend.get_attached_blinkstick_devices() or [] + found_devices = UnixLikeBackend.get_attached_blinkstick_devices() for d in found_devices: try: if usb.util.get_string(d, 3, 1033) == serial: diff --git a/src/blinkstick/backends/win32.py b/src/blinkstick/backends/win32.py index 05a0079..390b1ca 100644 --- a/src/blinkstick/backends/win32.py +++ b/src/blinkstick/backends/win32.py @@ -46,16 +46,14 @@ def _refresh_attached_blinkstick_device(self): @staticmethod def get_attached_blinkstick_devices( find_all: bool = True, - ) -> list[hid.HidDevice] | None: + ) -> list[hid.HidDevice]: devices = hid.HidDeviceFilter( vendor_id=VENDOR_ID, product_id=PRODUCT_ID ).get_devices() if find_all: return devices - elif len(devices) > 0: - return devices[0] - else: - return None + + return devices[:1] def control_transfer( self, bmRequestType, bRequest, wValue, wIndex, data_or_wLength From 83f07c8b8217a6c22a41ed5044522872607ed700 Mon Sep 17 00:00:00 2001 From: Rob Berwick Date: Sat, 30 Nov 2024 15:18:17 +0000 Subject: [PATCH 3/5] chore: add BlinkStickDevice dataclass --- src/blinkstick/devices/device.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 src/blinkstick/devices/device.py diff --git a/src/blinkstick/devices/device.py b/src/blinkstick/devices/device.py new file mode 100644 index 0000000..410f8c9 --- /dev/null +++ b/src/blinkstick/devices/device.py @@ -0,0 +1,15 @@ +from dataclasses import dataclass +from typing import Generic, TypeVar + +T = TypeVar("T") + + +@dataclass +class BlinkStickDevice(Generic[T]): + """A BlinkStick device representation""" + + raw_device: T + serial: str + manufacturer: str + version_attribute: int + description: str From 5a9fdfa94ccd05e7f822bb71a755653296862572 Mon Sep 17 00:00:00 2001 From: Rob Berwick Date: Sat, 30 Nov 2024 15:56:39 +0000 Subject: [PATCH 4/5] refactor: Use BlinkStickDevice class Use the BlinkStickDevice class to retrieve readonly device data up front, and persist it in the BlinkStick wrapper class --- src/blinkstick/backends/base.py | 23 ++++----- src/blinkstick/backends/unix_like.py | 70 +++++++++++++++------------- src/blinkstick/backends/win32.py | 61 ++++++++++++------------ src/blinkstick/blinkstick.py | 11 +++-- 4 files changed, 86 insertions(+), 79 deletions(-) diff --git a/src/blinkstick/backends/base.py b/src/blinkstick/backends/base.py index 0b90ef5..452fe54 100644 --- a/src/blinkstick/backends/base.py +++ b/src/blinkstick/backends/base.py @@ -4,12 +4,15 @@ from typing import TypeVar, Generic +from blinkstick.devices.device import BlinkStickDevice + T = TypeVar("T") class BaseBackend(ABC, Generic[T]): serial: str | None + blinkstick_device: BlinkStickDevice[T] def __init__(self): self.serial = None @@ -20,12 +23,14 @@ def _refresh_attached_blinkstick_device(self): @staticmethod @abstractmethod - def get_attached_blinkstick_devices(find_all: bool = True) -> list[T]: + def get_attached_blinkstick_devices( + find_all: bool = True, + ) -> list[BlinkStickDevice[T]]: raise NotImplementedError @staticmethod @abstractmethod - def find_by_serial(serial: str) -> list[T] | None: + def find_by_serial(serial: str) -> list[BlinkStickDevice[T]] | None: raise NotImplementedError @abstractmethod @@ -39,18 +44,14 @@ def control_transfer( ): raise NotImplementedError - @abstractmethod def get_serial(self) -> str: - raise NotImplementedError + return self.blinkstick_device.serial - @abstractmethod def get_manufacturer(self) -> str: - raise NotImplementedError + return self.blinkstick_device.manufacturer - @abstractmethod def get_version_attribute(self) -> int: - raise NotImplementedError + return self.blinkstick_device.version_attribute - @abstractmethod - def get_description(self) -> str: - raise NotImplementedError + def get_description(self): + return self.blinkstick_device.description diff --git a/src/blinkstick/backends/unix_like.py b/src/blinkstick/backends/unix_like.py index 335bc83..5b600d5 100644 --- a/src/blinkstick/backends/unix_like.py +++ b/src/blinkstick/backends/unix_like.py @@ -5,58 +5,70 @@ from blinkstick.constants import VENDOR_ID, PRODUCT_ID from blinkstick.backends.base import BaseBackend +from blinkstick.devices.device import BlinkStickDevice from blinkstick.exceptions import BlinkStickException class UnixLikeBackend(BaseBackend[usb.core.Device]): serial: str - device: usb.core.Device + blinkstick_device: BlinkStickDevice[usb.core.Device] def __init__(self, device=None): - self.device = device + self.blinkstick_device = device super().__init__() if device: self.open_device() self.serial = self.get_serial() def open_device(self) -> None: - if self.device is None: + if self.blinkstick_device is None: raise BlinkStickException("Could not find BlinkStick...") - if self.device.is_kernel_driver_active(0): + if self.blinkstick_device.raw_device.is_kernel_driver_active(0): try: - self.device.detach_kernel_driver(0) + self.blinkstick_device.raw_device.detach_kernel_driver(0) except usb.core.USBError as e: raise BlinkStickException("Could not detach kernel driver: %s" % str(e)) def _refresh_attached_blinkstick_device(self): - if not self.serial: + if not self.blinkstick_device: return False - if devices := self.find_by_serial(self.serial): - self.device = devices[0] + if devices := self.find_by_serial(self.blinkstick_device.serial): + self.blinkstick_device = devices[0] self.open_device() return True @staticmethod def get_attached_blinkstick_devices( find_all: bool = True, - ) -> list[usb.core.Device]: - return ( + ) -> list[BlinkStickDevice[usb.core.Device]]: + raw_devices = ( usb.core.find(find_all=find_all, idVendor=VENDOR_ID, idProduct=PRODUCT_ID) or [] ) + return [ + # TODO: refactor this to DRY up the usb.util.get_string calls + # note that we can't use _usb_get_string here because we're not in an instance method + # and we don't have a BlinkStickDevice instance to call it on + # until then we'll just have to live with the duplication, and the fact that we're not able + # to handle USB errors in the same way as we do in the instance methods + BlinkStickDevice( + raw_device=device, + serial=str(usb.util.get_string(device, 3, 1033)), + manufacturer=str(usb.util.get_string(device, 1, 1033)), + version_attribute=device.bcdDevice, + description=str(usb.util.get_string(device, 2, 1033)), + ) + for device in raw_devices + ] @staticmethod - def find_by_serial(serial: str) -> list[usb.core.Device] | None: + def find_by_serial(serial: str) -> list[BlinkStickDevice[usb.core.Device]] | None: found_devices = UnixLikeBackend.get_attached_blinkstick_devices() for d in found_devices: - try: - if usb.util.get_string(d, 3, 1033) == serial: - devices = [d] - return devices - except Exception as e: - print("{0}".format(e)) + if d.serial == serial: + return [d] return None @@ -69,7 +81,7 @@ def control_transfer( data_or_wLength: bytes | int, ): try: - return self.device.ctrl_transfer( + return self.blinkstick_device.raw_device.ctrl_transfer( bmRequestType, bRequest, wValue, wIndex, data_or_wLength ) except usb.USBError: @@ -77,7 +89,7 @@ def control_transfer( # attempt to find it again based on serial if self._refresh_attached_blinkstick_device(): - return self.device.ctrl_transfer( + return self.blinkstick_device.raw_device.ctrl_transfer( bmRequestType, bRequest, wValue, wIndex, data_or_wLength ) else: @@ -87,27 +99,19 @@ def control_transfer( ) ) - def get_serial(self) -> str: - return self._usb_get_string(3) - - def get_manufacturer(self) -> str: - return self._usb_get_string(1) - - def get_version_attribute(self) -> int: - return int(self.device.bcdDevice) - - def get_description(self): - return self._usb_get_string(2) - def _usb_get_string(self, index: int) -> str: try: - return str(usb.util.get_string(self.device, index, 1033)) + return str( + usb.util.get_string(self.blinkstick_device.raw_device, index, 1033) + ) except usb.USBError: # Could not communicate with BlinkStick backend # attempt to find it again based on serial if self._refresh_attached_blinkstick_device(): - return str(usb.util.get_string(self.device, index, 1033)) + return str( + usb.util.get_string(self.blinkstick_device.raw_device, index, 1033) + ) else: raise BlinkStickException( "Could not communicate with BlinkStick {0} - it may have been removed".format( diff --git a/src/blinkstick/backends/win32.py b/src/blinkstick/backends/win32.py index 390b1ca..7f4754c 100644 --- a/src/blinkstick/backends/win32.py +++ b/src/blinkstick/backends/win32.py @@ -7,29 +7,29 @@ from blinkstick.constants import VENDOR_ID, PRODUCT_ID from blinkstick.backends.base import BaseBackend +from blinkstick.devices.device import BlinkStickDevice from blinkstick.exceptions import BlinkStickException class Win32Backend(BaseBackend[hid.HidDevice]): serial: str - device: hid.HidDevice + blinkstick_device: BlinkStickDevice[hid.HidDevice] reports: list[hid.core.HidReport] - def __init__(self, device=None): + def __init__(self, device: BlinkStickDevice[hid.HidDevice]): super().__init__() - self.device = device + self.blinkstick_device = device if device: - self.device.open() - self.reports = self.device.find_feature_reports() + self.blinkstick_device.raw_device.open() + self.reports = self.blinkstick_device.raw_device.find_feature_reports() self.serial = self.get_serial() @staticmethod - def find_by_serial(serial: str) -> list[hid.HidDevice] | None: - found_devices = Win32Backend.get_attached_blinkstick_devices() or [] - devices = [d for d in found_devices if d.serial_number == serial] - - if len(devices) > 0: - return devices + def find_by_serial(serial: str) -> list[BlinkStickDevice[hid.HidDevice]] | None: + found_devices = Win32Backend.get_attached_blinkstick_devices() + for d in found_devices: + if d.serial == serial: + return [d] return None @@ -38,22 +38,33 @@ def _refresh_attached_blinkstick_device(self): if not self.serial: return False if devices := self.find_by_serial(self.serial): - self.device = devices[0] - self.device.open() - self.reports = self.device.find_feature_reports() + self.blinkstick_device = devices[0] + self.blinkstick_device.raw_device.open() + self.reports = self.blinkstick_device.raw_device.find_feature_reports() return True @staticmethod def get_attached_blinkstick_devices( find_all: bool = True, - ) -> list[hid.HidDevice]: + ) -> list[BlinkStickDevice[hid.HidDevice]]: devices = hid.HidDeviceFilter( vendor_id=VENDOR_ID, product_id=PRODUCT_ID ).get_devices() + + blinkstick_devices = [ + BlinkStickDevice( + raw_device=device, + serial=device.serial_number, + manufacturer=device.vendor_name, + version_attribute=device.version_number, + description=device.product_name, + ) + for device in devices + ] if find_all: - return devices + return blinkstick_devices - return devices[:1] + return blinkstick_devices[:1] def control_transfer( self, bmRequestType, bRequest, wValue, wIndex, data_or_wLength @@ -68,9 +79,9 @@ def control_transfer( *[c_ubyte(c) for c in data_or_wLength] ) data[0] = wValue - if not self.device.send_feature_report(data): + if not self.blinkstick_device.raw_device.send_feature_report(data): if self._refresh_attached_blinkstick_device(): - self.device.send_feature_report(data) + self.blinkstick_device.raw_device.send_feature_report(data) else: raise BlinkStickException( "Could not communicate with BlinkStick {0} - it may have been removed".format( @@ -80,15 +91,3 @@ def control_transfer( elif bmRequestType == 0x80 | 0x20: return self.reports[wValue - 1].get() - - def get_serial(self) -> str: - return str(self.device.serial_number) - - def get_manufacturer(self) -> str: - return str(self.device.vendor_name) - - def get_version_attribute(self) -> int: - return int(self.device.version_number) - - def get_description(self) -> str: - return str(self.device.product_name) diff --git a/src/blinkstick/blinkstick.py b/src/blinkstick/blinkstick.py index d628d46..173f337 100644 --- a/src/blinkstick/blinkstick.py +++ b/src/blinkstick/blinkstick.py @@ -15,6 +15,7 @@ ColorFormat, ) from blinkstick.constants import VENDOR_ID, PRODUCT_ID, BlinkStickVariant +from blinkstick.devices.device import BlinkStickDevice from blinkstick.exceptions import BlinkStickException from blinkstick.utilities import string_to_info_block_data @@ -51,7 +52,9 @@ class BlinkStick: backend: USBBackend bs_serial: str - def __init__(self, device=None, error_reporting: bool = True): + def __init__( + self, device: BlinkStickDevice | None = None, error_reporting: bool = True + ): """ Constructor for the class. @@ -1418,10 +1421,10 @@ def find_first() -> BlinkStick | None: @rtype: BlinkStick @return: BlinkStick object or None if no devices are found """ - d = USBBackend.get_attached_blinkstick_devices(find_all=False) + blinkstick_devices = USBBackend.get_attached_blinkstick_devices(find_all=False) - if d: - return BlinkStick(device=d) + if blinkstick_devices: + return BlinkStick(device=blinkstick_devices[0]) return None From f06595f9f633bf761cd2db4ca8b0e43deef5d6a7 Mon Sep 17 00:00:00 2001 From: Rob Berwick Date: Sat, 30 Nov 2024 18:41:49 +0000 Subject: [PATCH 5/5] refactor: Use BlinkStickDevice class Use the BlinkStickDevice class to retrieve readonly device data up front, and persist it in the BlinkStick wrapper class --- src/blinkstick/backends/unix_like.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/src/blinkstick/backends/unix_like.py b/src/blinkstick/backends/unix_like.py index 5b600d5..fb85cdb 100644 --- a/src/blinkstick/backends/unix_like.py +++ b/src/blinkstick/backends/unix_like.py @@ -49,10 +49,6 @@ def get_attached_blinkstick_devices( ) return [ # TODO: refactor this to DRY up the usb.util.get_string calls - # note that we can't use _usb_get_string here because we're not in an instance method - # and we don't have a BlinkStickDevice instance to call it on - # until then we'll just have to live with the duplication, and the fact that we're not able - # to handle USB errors in the same way as we do in the instance methods BlinkStickDevice( raw_device=device, serial=str(usb.util.get_string(device, 3, 1033)), @@ -98,23 +94,3 @@ def control_transfer( self.serial ) ) - - def _usb_get_string(self, index: int) -> str: - try: - return str( - usb.util.get_string(self.blinkstick_device.raw_device, index, 1033) - ) - except usb.USBError: - # Could not communicate with BlinkStick backend - # attempt to find it again based on serial - - if self._refresh_attached_blinkstick_device(): - return str( - usb.util.get_string(self.blinkstick_device.raw_device, index, 1033) - ) - else: - raise BlinkStickException( - "Could not communicate with BlinkStick {0} - it may have been removed".format( - self.serial - ) - )