diff --git a/README-cn.md b/README-cn.md index 755ce72..27421fe 100644 --- a/README-cn.md +++ b/README-cn.md @@ -21,7 +21,8 @@ ```sh # 安装依赖 sudo apt install python3-pip python3-dev libcairo2-dev libgirepository1.0-dev \ - libbluetooth-dev libdbus-1-dev bluez-tools python3-cairo-dev + libbluetooth-dev libdbus-1-dev bluez-tools python3-cairo-dev \ + rfkill meson # 安装 bluescan sudo pip install bluescan @@ -53,7 +54,8 @@ bluescan 基于 Linux 官方的 BlueZ 蓝牙协议栈开发。它仅支持在 Li ```sh sudo apt install python3-pip python3-dev libcairo2-dev libgirepository1.0-dev \ - libbluetooth-dev libdbus-1-dev bluez-tools python3-cairo-dev + libbluetooth-dev libdbus-1-dev bluez-tools python3-cairo-dev \ + rfkill meson ``` 如果后续在[安装](https://github.com/fO-000/bluescan/blob/master/README-cn.md#%E5%AE%89%E8%A3%85) bluescan 时仍遇到错误,请尝试继续安装如下 package 来解决: diff --git a/README.md b/README.md index 5c545fc..cd121e5 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,8 @@ ```sh # Install dependencies sudo apt install python3-pip python3-dev libcairo2-dev libgirepository1.0-dev \ - libbluetooth-dev libdbus-1-dev bluez-tools python3-cairo-dev + libbluetooth-dev libdbus-1-dev bluez-tools python3-cairo-dev \ + rfkill meson # Install bluescan sudo pip install bluescan @@ -53,7 +54,8 @@ bluescan is based on BlueZ, the official Linux Bluetooth stack. It only supports ```sh sudo apt install python3-pip python3-dev libcairo2-dev libgirepository1.0-dev \ - libbluetooth-dev libdbus-1-dev bluez-tools python3-cairo-dev + libbluetooth-dev libdbus-1-dev bluez-tools python3-cairo-dev \ + rfkill meson ``` If you still encounter errors when [installing](https://github.com/fO-000/bluescan#install) bluescan, please try to install the following packages to solve: diff --git a/setup.cfg b/setup.cfg index f2f2c9c..bef528d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,12 +31,12 @@ package_dir = packages = find: python_requires = ==3.10.* install_requires = - xpycommon >= 0.0.11 + xpycommon >= 0.0.12 pyclui >= 0.0.17 - bthci >= 0.0.31 - btsmp >= 0.0.10 - btatt >= 0.0.12 - btgatt >= 0.0.16 + bthci >= 0.0.33 + btsmp >= 0.0.11 + btatt >= 0.0.14 + btgatt >= 0.0.17 bluepy >= 1.3.0 halo >= 0.0.31 docopt >= 0.6.2 diff --git a/src/bluescan/__init__.py b/src/bluescan/__init__.py index 93280ec..d68ba91 100644 --- a/src/bluescan/__init__.py +++ b/src/bluescan/__init__.py @@ -1,12 +1,11 @@ #!/usr/bin/env python3 PKG_NAME = 'bluescan' -VERSION = '0.8.4' +VERSION = '0.8.5' DEBUG_VERSION = None from pyclui import Logger, INFO, DEBUG - LOG_LEVEL = INFO logger = Logger(__name__, LOG_LEVEL) @@ -15,12 +14,12 @@ logger.setLevel(LOG_LEVEL) logger.warning("Using the debug version {} of {}".format(DEBUG_VERSION, PKG_NAME)) + import io import pkg_resources from pathlib import Path from bthci import HCI, ControllerErrorCodes -from xpycommon.bluetooth import bd_addr_bytes2str PKG_ROOT = Path(__file__).parent @@ -75,7 +74,7 @@ def __init__(self, iface='hci0'): raise RuntimeError("hci.read_bd_addr() returned, status: 0x{:02x} {}".format( cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name)) else: - self.hci_bd_addr = bd_addr_bytes2str(cmd_complete.bd_addr).upper() + self.hci_bd_addr = cmd_complete.bd_addr.upper() class ScanResult: diff --git a/src/bluescan/__main__.py b/src/bluescan/__main__.py index c236220..5017306 100644 --- a/src/bluescan/__main__.py +++ b/src/bluescan/__main__.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +import sys import os import subprocess import traceback @@ -10,8 +11,6 @@ from pyclui import Logger, blue from bluepy.btle import BTLEException -from xpycommon.bluetooth import bd_addr_bytes2str - from . import BlueScanner, LOG_LEVEL from .ui import parse_cmdline from .helper import find_rfkill_devid, get_microbit_devpaths @@ -48,44 +47,51 @@ def prepare_hci(iface: str = 'hci0'): logger.debug("Sending hci.inquiry_cancel()") cmd_complete = hci.inquiry_cancel() if cmd_complete.status not in (ControllerErrorCodes.SUCCESS, ControllerErrorCodes.COMMAND_DISALLOWED): - logger.warning("hci.inquiry_cancel() returned, status: 0x{:02x} {}".format( - cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name)) + logger.warning("Failed to inquiry cancel\n" + " command complete status: 0x{:02x} {}".format( + cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name)) logger.debug("Sending hci.exit_periodic_inquiry_mode()") cmd_complete = hci.exit_periodic_inquiry_mode() if cmd_complete.status not in (ControllerErrorCodes.SUCCESS, ControllerErrorCodes.COMMAND_DISALLOWED): - logger.warning("hci.exit_periodic_inquiry_mode() returned, status: 0x{:02x} {}".format( - cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name)) + logger.warning("Failed to exit periodic inquiry mode\n" + " command complete status: 0x{:02x} {}".format( + cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name)) hci.write_scan_enable() # No scan enabled if cmd_complete.status not in (ControllerErrorCodes.SUCCESS, ControllerErrorCodes.COMMAND_DISALLOWED): - logger.warning("hci.write_scan_enable() returned, status: 0x{:02x} {}".format( - cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name)) + logger.warning("Failed to write scan enable\n" + " command complete status: 0x{:02x} {}".format( + cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name)) cmd_complete = hci.le_set_advertising_enable() # Advertising is disabled if cmd_complete.status not in (ControllerErrorCodes.SUCCESS, ControllerErrorCodes.COMMAND_DISALLOWED): - logger.warning("hci.le_set_advertising_enable() returned, status: 0x{:02x} {}".format( - cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name)) + logger.warning("Failed to le set advertising enable\n" + " command complete status: 0x{:02x} {}".format( + cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name)) cmd_complete = hci.le_set_scan_enable(False, True) if cmd_complete.status not in (ControllerErrorCodes.SUCCESS, ControllerErrorCodes.COMMAND_DISALLOWED): - logger.warning("hci.le_set_scan_enable() returned, status: 0x{:02x} {}".format( - cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name)) + logger.warning("Failed to le set scan enable\n" + " command complete status: 0x{:02x} {}".format( + cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name)) cmd_complete = hci.set_event_filter(0x00) # Clear All Filters if cmd_complete.status != ControllerErrorCodes.SUCCESS: - logger.warning("hci.set_event_filter() returned, status: 0x{:02x} {}".format( - cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name)) + logger.warning("Failed to set event filter\n" + " command complete status: 0x{:02x} {}".format( + cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name)) cmd_complete = hci.read_bd_addr() if cmd_complete.status != ControllerErrorCodes.SUCCESS: - raise RuntimeError("hci.read_bd_addr() returned, status: 0x{:02x} {}".format( - cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name)) + raise RuntimeError("Failed to read BD_ADDR\n" + " command complete status: 0x{:02x} {}".format( + cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name)) else: - local_bd_addr = bd_addr_bytes2str(cmd_complete.bd_addr).upper() + local_bd_addr = cmd_complete.bd_addr # Clear bluetoothd cache - cache_path = PosixPath('/var/lib/bluetooth/')/local_bd_addr/'cache' + cache_path = PosixPath('/var/lib/bluetooth/')/local_bd_addr.upper()/'cache' if cache_path.exists(): for file in cache_path.iterdir(): os.remove(file) @@ -150,10 +156,10 @@ def main(): # 当 user 没有显示指明 hci 设备情况下,我们需要自动获取一个可用的 hci # 设备。注意这个设备不一定是 hci0。因为系统中可能只有 hci1,而没有 hci0。 try: - args['-i'] = HCI.get_default_hcistr() + args['-i'] = HCI.get_default_iface() except HciError: logger.error('No available HCI device') - exit(-1) + sys.exit(-1) prepare_hci(args['-i']) @@ -202,7 +208,7 @@ def main(): except (RuntimeError, ValueError) as e: logger.error("{}: {}".format(e.__class__.__name__, e)) traceback.print_exc() - exit(1) + sys.exit(1) except (BTLEException) as e: logger.error(str(e) + ("\nNo BLE adapter or missing sudo?" if 'le on' in str(e) else "")) except PluginInstallError as e: diff --git a/src/bluescan/br_scan.py b/src/bluescan/br_scan.py index 014ab2a..a214d6d 100644 --- a/src/bluescan/br_scan.py +++ b/src/bluescan/br_scan.py @@ -5,8 +5,10 @@ from bthci import HCI, HciRuntimeError, ControllerErrorCodes, HciEventCodes, \ HCI_Inquiry_Result, HCI_Inquiry_Result_with_RSSI, HCI_Extended_Inquiry_Result +from bthci.bluez_hci import HCI_CHANNEL_USER from pyclui import Logger from pyclui import green, blue, yellow, red +from xpycommon.bluetooth import bd_addr_str2bytes from . import BlueScanner, service_cls_profile_ids, LOG_LEVEL from .ui import INDENT @@ -64,22 +66,25 @@ def inquiry_result_handler(result: bytes): try: hci.inquiry(inquiry_len=inquiry_len, inquiry_result_handler=inquiry_result_handler) - logger.info('Inquiry completed\n') + # logger.info('Inquiry completed\n') if self.remote_name_req_flag and len(self.scanned_dev) != 0: logger.info('Requesting the name of the scanned devices...') for bd_addr in self.scanned_dev: try: - name = hci.remote_name_request({ - 'BD_ADDR': bytes.fromhex(bd_addr.replace(':', '')), - 'Page_Scan_Repetition_Mode': 0x01, - 'Reserved': 0x00, 'Clock_Offset': 0x0000 - })['Remote_Name'].decode().strip() - except Exception as e: - print(e) + remote_name_req_compelte = hci.remote_name_request(bd_addr) + if remote_name_req_compelte.status !=ControllerErrorCodes.SUCCESS: + logger.error("Failed to request remote name {}\n" + " remote name request complete status: 0x{:02x} {}".format( + bd_addr, + remote_name_req_compelte.status, ControllerErrorCodes[remote_name_req_compelte.status].name)) + name = '' + else: + name = remote_name_req_compelte.remote_name + except TimeoutError as e: name = '' - print(bd_addr+':', blue(name)) + print("{} : {}".format(bd_addr, blue(name))) except HciRuntimeError as e: logger.error("{}".format(e)) except KeyboardInterrupt as e: @@ -90,12 +95,12 @@ def inquiry_result_handler(result: bytes): def scan_lmp_feature(self, paddr: str): - hci = HCI(self.iface) + hci = HCI(self.iface, HCI_CHANNEL_USER) conn_complete = hci.create_connection(paddr, page_scan_repetition_mode = 0x02) if conn_complete.status != ControllerErrorCodes.SUCCESS: logger.error("Failed to connect {} BD/EDR address\n" - " status {} {}".format( + " connection complete status: 0x{:02x} {}".format( paddr, conn_complete.status, ControllerErrorCodes[conn_complete.status].name)) sys.exit(1) @@ -115,9 +120,11 @@ def scan_lmp_feature(self, paddr: str): read_remote_supported_features_complete = hci.read_remote_supported_features(conn_complete.conn_handle) if read_remote_supported_features_complete.status != ControllerErrorCodes.SUCCESS: - logger.error('Failed to read remote supported features') + logger.error("Failed to read remote extented features\n" + " read remote extented features complete status: 0x{:02x} {}".format( + read_remote_ext_features_complete.status, ControllerErrorCodes[read_remote_ext_features_complete.status].name)) hci.disconnect(conn_complete.conn_handle) - exit(1) + sys.exit(1) print(blue('LMP features')) pp_lmp_features(read_remote_supported_features_complete.lmp_features) @@ -127,14 +134,17 @@ def scan_lmp_feature(self, paddr: str): sys.exit(1) print(blue('Extended LMP features')) + # Get Max_Page_Number read_remote_ext_features_complete = hci.read_remote_extended_features(conn_complete.conn_handle, 0x00) if read_remote_ext_features_complete.status != ControllerErrorCodes.SUCCESS: - logger.error('Failed to read remote extented features') + logger.error("Failed to read remote extented features\n" + " read remote extented features complete status: 0x{:02x} {}".format( + read_remote_ext_features_complete.status, ControllerErrorCodes[read_remote_ext_features_complete.status].name)) hci.disconnect(conn_complete.conn_handle) - exit(1) - - pp_ext_lmp_features(read_remote_ext_features_complete.ext_lmp_features, 0) - for i in range(1, read_remote_ext_features_complete.max_page_num+1): + sys.exit(1) + + max_page_num = read_remote_ext_features_complete.max_page_num + for i in range(1, max_page_num+1): read_remote_ext_features_complete_i = hci.read_remote_extended_features(conn_complete.conn_handle, i) if read_remote_ext_features_complete_i.status != ControllerErrorCodes.SUCCESS: logger.error('Failed to read remote extented features, page {}'.format(i)) diff --git a/src/bluescan/gatt_scan.py b/src/bluescan/gatt_scan.py index 0b97674..a0b0207 100644 --- a/src/bluescan/gatt_scan.py +++ b/src/bluescan/gatt_scan.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import io +from multiprocessing.sharedctypes import Value import pickle import threading import subprocess @@ -9,6 +10,7 @@ from uuid import UUID import pkg_resources +from bthci import ADDR_TYPE_PUBLIC, ADDR_TYPE_RANDOM from pyclui import green, blue, yellow, red from pyclui import Logger from halo import Halo @@ -323,7 +325,7 @@ def register_agent_error_callback(error): reply_handler=register_agent_callback, error_handler=register_agent_error_callback) - def scan(self, addr: str, addr_type: str) -> GattScanResult: + def scan(self, addr: str, addr_type: int = ADDR_TYPE_PUBLIC) -> GattScanResult: logger.debug("Entered scan()") try: @@ -339,8 +341,14 @@ def run_mainloop(): self.result.addr_type = addr_type if self.result.addr_type is None: - self.result.addr_type = self.determine_addr_type() - + found_addr_type = self.determine_addr_type() + if found_addr_type == 'public': + self.result.addr_type = ADDR_TYPE_PUBLIC + elif found_addr_type == 'random': + self.result.addr_type = ADDR_TYPE_RANDOM + else: + raise ValueError("Unknown found_addr_type: {}".format(found_addr_type)) + self.gatt_client = GattClient(self.iface) logger.debug("Address: {}\n".format(self.result.addr) + @@ -593,8 +601,3 @@ def determine_addr_type(self): return dev_info.addr_type raise RuntimeError("Couldn't determine the LE address type. Please provide it explicitly.") - - -if __name__ == '__main__': - result = GattScanner().scan('6F:45:66:76:41:12', 'random', True) - result.print() diff --git a/src/bluescan/le_scan.py b/src/bluescan/le_scan.py index 7d7f2fc..9caaaeb 100644 --- a/src/bluescan/le_scan.py +++ b/src/bluescan/le_scan.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -from pdb import runeval +import sys import pickle import struct import subprocess @@ -11,8 +11,8 @@ from halo import Halo from serial import Serial -from bthci import HCI, ControllerErrorCodes -from bthci.hci import HciRuntimeError +from xpycommon.bluetooth import IoCapabilities +from bthci import HCI, ControllerErrorCodes, HciRuntimeError, ADDR_TYPE_PUBLIC import btsmp from btsmp import * from pyclui import Logger, blue, green, red @@ -64,7 +64,7 @@ def handleDiscovery(self, scanEntry, isNewDev, isNewData): class LeDeviceInfo: - def __init__(self, addr: str, addr_type : str, connectable : bool, rssi : int) -> None: + def __init__(self, addr: str, addr_type: str, connectable : bool, rssi : int) -> None: """ addr - Upper case """ @@ -244,7 +244,7 @@ def scan_devs(self, timeout=8, scan_type='active', sort='rssi') -> LeDevicesScan devs.sort(key=lambda d:d.rssi) for dev in devs: - dev_info = LeDeviceInfo(dev.addr.upper(), dev.addrType, dev.connectable, dev.rssi) + dev_info = LeDeviceInfo(dev.addr.upper(), dev.addrType.lower(), dev.connectable, dev.rssi) self.devs_scan_result.add_device_info(dev_info) # print('Addr: ', blue(dev.addr.upper())) @@ -280,11 +280,11 @@ def scan_devs(self, timeout=8, scan_type='active', sort='rssi') -> LeDevicesScan return self.devs_scan_result - def scan_ll_feature(self, paddr: str, patype: str, timeout: int = 10): + def scan_ll_feature(self, paddr: str, patype: int = ADDR_TYPE_PUBLIC, timeout: int = 10): """LL feature scanning paddr - Peer addresss for scanning LL features. - patype - Peer address type, public or random. + patype - Peer address type, ADDR_TYPE_PUBLIC or ADDR_TYPE_RANDOM. timeout - sec """ spinner = Halo(text="Scanning", spinner={'interval': 200, @@ -317,7 +317,7 @@ def scan_ll_feature(self, paddr: str, patype: str, timeout: int = 10): logger.error("Failed to le read remote features\n" " status: 0x{:02x} {}".format(le_read_remote_features_complete.status, ControllerErrorCodes[le_read_remote_features_complete.status].name)) - exit(1) + sys.exit(1) spinner.stop() print(blue('LE LL Features:')) @@ -327,7 +327,7 @@ def scan_ll_feature(self, paddr: str, patype: str, timeout: int = 10): return - def detect_pairing_feature(self, paddr, patype, timeout:int=10): + def detect_pairing_feature(self, paddr, patype: int = ADDR_TYPE_PUBLIC, timeout: int = 10): """ """ # TODO Mac OS 会弹窗,需要解决。 @@ -352,7 +352,6 @@ def detect_pairing_feature(self, paddr, patype, timeout:int=10): 'frames': ['', '.', '.'*2, '.'*3]}, placement='right') hci = HCI(self.hci) - logger.info('Scanning LE LL Features of %s, using %s\n'%(blue(paddr), blue(self.hci))) spinner.start() @@ -441,8 +440,8 @@ def pp_smp_pkt(pkt:bytes): if code == btsmp.CmdCode.PAIRING_RESPONSE: print(blue("Pairing Response")) - iocap, oob, auth_req, max_enc_key_size, init_key_distr, rsp_key_distr = struct.unpack('BBBBBB', pkt[1:]) - print(" IO Capability: 0x%02x - %s" % (iocap, green(IOCapability[iocap].hname))) + io_cap, oob, auth_req, max_enc_key_size, init_key_distr, rsp_key_distr = struct.unpack('BBBBBB', pkt[1:]) + print(" IO Capability: 0x%02x - %s" % (io_cap, green(IoCapabilities[io_cap].name))) print(" OOB data flag: 0x%02x - %s" % (oob, OOBDataFlag[oob].hname)) print(" AuthReq: 0x%02x" % auth_req) bonding_flag = (auth_req & BONDING_FLAGS_MSK) >> BONDING_FLAGS_POS diff --git a/src/bluescan/lmp.py b/src/bluescan/lmp.py index ba88138..927c979 100644 --- a/src/bluescan/lmp.py +++ b/src/bluescan/lmp.py @@ -112,7 +112,7 @@ def pp_lmp_features(lmp_features:bytes): print(' Extended features:', green('True') if (b >> 7) & 0x01 else red('False')) -def pp_ext_lmp_features(ext_lmp_features:bytes, page_num:int): +def pp_ext_lmp_features(ext_lmp_features: bytes, page_num: int): '''Parse and print Extended LMP Features ext_lmp_features -- when page_num is 0, 8 bytes; diff --git a/src/bluescan/ui.py b/src/bluescan/ui.py index 1152098..a235b84 100644 --- a/src/bluescan/ui.py +++ b/src/bluescan/ui.py @@ -57,10 +57,12 @@ --run-plugin= Execute plugin by name. """ +import sys from docopt import docopt # from btgatt import service_names, charac_names, descriptor_names from pyclui import red, Logger +from bthci import ADDR_TYPE_PUBLIC, ADDR_TYPE_RANDOM from xpycommon import valid_bdaddr @@ -109,15 +111,18 @@ def parse_cmdline() -> dict: if args['--addr-type'] is not None: args['--addr-type'] = args['--addr-type'].lower() - if args['--addr-type'] not in ('public', 'random'): - raise ValueError("Invalid address type %s, " % \ - args['--addr-type'] + "must be public or random.") + if args['--addr-type'] == 'public': + args['--addr-type'] = ADDR_TYPE_PUBLIC + elif args['--addr-type'] == 'random': + args['--addr-type'] = ADDR_TYPE_RANDOM + else: + raise ValueError("Invalid address type %s, must be public or random.".format(args['--addr-type'])) if args['--io-capability'] not in ['DisplayOnly', 'DisplayYesNo', 'KeyboardOnly', 'NoInputNoOutput', 'KeyboardDisplay', 'KeyboardOnly']: raise ValueError("Invalid IO capability %s" % args['--io-capability']) except ValueError as e: logger.error(str(e)) - exit(1) + sys.exit(1) return args