Skip to content

Commit

Permalink
Adapted to the higher version of the dependencies and made some small…
Browse files Browse the repository at this point in the history
… improvements
  • Loading branch information
x committed Sep 14, 2022
1 parent 1f002ea commit 28394c5
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 77 deletions.
6 changes: 4 additions & 2 deletions README-cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
```sh
# 安装依赖
sudo apt install python3-pip python3-dev libcairo2-dev libgirepository1.0-dev \
libbluetooth-dev libdbus-1-dev bluez-tools python3-cairo-dev
libbluetooth-dev libdbus-1-dev bluez-tools python3-cairo-dev \
rfkill meson

# 安装 bluescan
sudo pip install bluescan
Expand Down Expand Up @@ -53,7 +54,8 @@ bluescan 基于 Linux 官方的 BlueZ 蓝牙协议栈开发。它仅支持在 Li

```sh
sudo apt install python3-pip python3-dev libcairo2-dev libgirepository1.0-dev \
libbluetooth-dev libdbus-1-dev bluez-tools python3-cairo-dev
libbluetooth-dev libdbus-1-dev bluez-tools python3-cairo-dev \
rfkill meson
```

如果后续在[安装](https://github.com/fO-000/bluescan/blob/master/README-cn.md#%E5%AE%89%E8%A3%85) bluescan 时仍遇到错误,请尝试继续安装如下 package 来解决:
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
```sh
# Install dependencies
sudo apt install python3-pip python3-dev libcairo2-dev libgirepository1.0-dev \
libbluetooth-dev libdbus-1-dev bluez-tools python3-cairo-dev
libbluetooth-dev libdbus-1-dev bluez-tools python3-cairo-dev \
rfkill meson

# Install bluescan
sudo pip install bluescan
Expand Down Expand Up @@ -53,7 +54,8 @@ bluescan is based on BlueZ, the official Linux Bluetooth stack. It only supports

```sh
sudo apt install python3-pip python3-dev libcairo2-dev libgirepository1.0-dev \
libbluetooth-dev libdbus-1-dev bluez-tools python3-cairo-dev
libbluetooth-dev libdbus-1-dev bluez-tools python3-cairo-dev \
rfkill meson
```

If you still encounter errors when [installing](https://github.com/fO-000/bluescan#install) bluescan, please try to install the following packages to solve:
Expand Down
10 changes: 5 additions & 5 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ package_dir =
packages = find:
python_requires = ==3.10.*
install_requires =
xpycommon >= 0.0.11
xpycommon >= 0.0.12
pyclui >= 0.0.17
bthci >= 0.0.31
btsmp >= 0.0.10
btatt >= 0.0.12
btgatt >= 0.0.16
bthci >= 0.0.33
btsmp >= 0.0.11
btatt >= 0.0.14
btgatt >= 0.0.17
bluepy >= 1.3.0
halo >= 0.0.31
docopt >= 0.6.2
Expand Down
7 changes: 3 additions & 4 deletions src/bluescan/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#!/usr/bin/env python3

PKG_NAME = 'bluescan'
VERSION = '0.8.4'
VERSION = '0.8.5'
DEBUG_VERSION = None

from pyclui import Logger, INFO, DEBUG


LOG_LEVEL = INFO
logger = Logger(__name__, LOG_LEVEL)

Expand All @@ -15,12 +14,12 @@
logger.setLevel(LOG_LEVEL)
logger.warning("Using the debug version {} of {}".format(DEBUG_VERSION, PKG_NAME))


import io
import pkg_resources
from pathlib import Path

from bthci import HCI, ControllerErrorCodes
from xpycommon.bluetooth import bd_addr_bytes2str


PKG_ROOT = Path(__file__).parent
Expand Down Expand Up @@ -75,7 +74,7 @@ def __init__(self, iface='hci0'):
raise RuntimeError("hci.read_bd_addr() returned, status: 0x{:02x} {}".format(
cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name))
else:
self.hci_bd_addr = bd_addr_bytes2str(cmd_complete.bd_addr).upper()
self.hci_bd_addr = cmd_complete.bd_addr.upper()


class ScanResult:
Expand Down
48 changes: 27 additions & 21 deletions src/bluescan/__main__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3

import sys
import os
import subprocess
import traceback
Expand All @@ -10,8 +11,6 @@
from pyclui import Logger, blue
from bluepy.btle import BTLEException

from xpycommon.bluetooth import bd_addr_bytes2str

from . import BlueScanner, LOG_LEVEL
from .ui import parse_cmdline
from .helper import find_rfkill_devid, get_microbit_devpaths
Expand Down Expand Up @@ -48,44 +47,51 @@ def prepare_hci(iface: str = 'hci0'):
logger.debug("Sending hci.inquiry_cancel()")
cmd_complete = hci.inquiry_cancel()
if cmd_complete.status not in (ControllerErrorCodes.SUCCESS, ControllerErrorCodes.COMMAND_DISALLOWED):
logger.warning("hci.inquiry_cancel() returned, status: 0x{:02x} {}".format(
cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name))
logger.warning("Failed to inquiry cancel\n"
" command complete status: 0x{:02x} {}".format(
cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name))

logger.debug("Sending hci.exit_periodic_inquiry_mode()")
cmd_complete = hci.exit_periodic_inquiry_mode()
if cmd_complete.status not in (ControllerErrorCodes.SUCCESS, ControllerErrorCodes.COMMAND_DISALLOWED):
logger.warning("hci.exit_periodic_inquiry_mode() returned, status: 0x{:02x} {}".format(
cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name))
logger.warning("Failed to exit periodic inquiry mode\n"
" command complete status: 0x{:02x} {}".format(
cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name))

hci.write_scan_enable() # No scan enabled
if cmd_complete.status not in (ControllerErrorCodes.SUCCESS, ControllerErrorCodes.COMMAND_DISALLOWED):
logger.warning("hci.write_scan_enable() returned, status: 0x{:02x} {}".format(
cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name))
logger.warning("Failed to write scan enable\n"
" command complete status: 0x{:02x} {}".format(
cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name))

cmd_complete = hci.le_set_advertising_enable() # Advertising is disabled
if cmd_complete.status not in (ControllerErrorCodes.SUCCESS, ControllerErrorCodes.COMMAND_DISALLOWED):
logger.warning("hci.le_set_advertising_enable() returned, status: 0x{:02x} {}".format(
cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name))
logger.warning("Failed to le set advertising enable\n"
" command complete status: 0x{:02x} {}".format(
cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name))

cmd_complete = hci.le_set_scan_enable(False, True)
if cmd_complete.status not in (ControllerErrorCodes.SUCCESS, ControllerErrorCodes.COMMAND_DISALLOWED):
logger.warning("hci.le_set_scan_enable() returned, status: 0x{:02x} {}".format(
cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name))
logger.warning("Failed to le set scan enable\n"
" command complete status: 0x{:02x} {}".format(
cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name))

cmd_complete = hci.set_event_filter(0x00) # Clear All Filters
if cmd_complete.status != ControllerErrorCodes.SUCCESS:
logger.warning("hci.set_event_filter() returned, status: 0x{:02x} {}".format(
cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name))
logger.warning("Failed to set event filter\n"
" command complete status: 0x{:02x} {}".format(
cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name))

cmd_complete = hci.read_bd_addr()
if cmd_complete.status != ControllerErrorCodes.SUCCESS:
raise RuntimeError("hci.read_bd_addr() returned, status: 0x{:02x} {}".format(
cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name))
raise RuntimeError("Failed to read BD_ADDR\n"
" command complete status: 0x{:02x} {}".format(
cmd_complete.status, ControllerErrorCodes[cmd_complete.status].name))
else:
local_bd_addr = bd_addr_bytes2str(cmd_complete.bd_addr).upper()
local_bd_addr = cmd_complete.bd_addr

# Clear bluetoothd cache
cache_path = PosixPath('/var/lib/bluetooth/')/local_bd_addr/'cache'
cache_path = PosixPath('/var/lib/bluetooth/')/local_bd_addr.upper()/'cache'
if cache_path.exists():
for file in cache_path.iterdir():
os.remove(file)
Expand Down Expand Up @@ -150,10 +156,10 @@ def main():
# 当 user 没有显示指明 hci 设备情况下,我们需要自动获取一个可用的 hci
# 设备。注意这个设备不一定是 hci0。因为系统中可能只有 hci1,而没有 hci0。
try:
args['-i'] = HCI.get_default_hcistr()
args['-i'] = HCI.get_default_iface()
except HciError:
logger.error('No available HCI device')
exit(-1)
sys.exit(-1)

prepare_hci(args['-i'])

Expand Down Expand Up @@ -202,7 +208,7 @@ def main():
except (RuntimeError, ValueError) as e:
logger.error("{}: {}".format(e.__class__.__name__, e))
traceback.print_exc()
exit(1)
sys.exit(1)
except (BTLEException) as e:
logger.error(str(e) + ("\nNo BLE adapter or missing sudo?" if 'le on' in str(e) else ""))
except PluginInstallError as e:
Expand Down
46 changes: 28 additions & 18 deletions src/bluescan/br_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

from bthci import HCI, HciRuntimeError, ControllerErrorCodes, HciEventCodes, \
HCI_Inquiry_Result, HCI_Inquiry_Result_with_RSSI, HCI_Extended_Inquiry_Result
from bthci.bluez_hci import HCI_CHANNEL_USER
from pyclui import Logger
from pyclui import green, blue, yellow, red
from xpycommon.bluetooth import bd_addr_str2bytes

from . import BlueScanner, service_cls_profile_ids, LOG_LEVEL
from .ui import INDENT
Expand Down Expand Up @@ -64,22 +66,25 @@ def inquiry_result_handler(result: bytes):

try:
hci.inquiry(inquiry_len=inquiry_len, inquiry_result_handler=inquiry_result_handler)
logger.info('Inquiry completed\n')
# logger.info('Inquiry completed\n')

if self.remote_name_req_flag and len(self.scanned_dev) != 0:
logger.info('Requesting the name of the scanned devices...')
for bd_addr in self.scanned_dev:
try:
name = hci.remote_name_request({
'BD_ADDR': bytes.fromhex(bd_addr.replace(':', '')),
'Page_Scan_Repetition_Mode': 0x01,
'Reserved': 0x00, 'Clock_Offset': 0x0000
})['Remote_Name'].decode().strip()
except Exception as e:
print(e)
remote_name_req_compelte = hci.remote_name_request(bd_addr)
if remote_name_req_compelte.status !=ControllerErrorCodes.SUCCESS:
logger.error("Failed to request remote name {}\n"
" remote name request complete status: 0x{:02x} {}".format(
bd_addr,
remote_name_req_compelte.status, ControllerErrorCodes[remote_name_req_compelte.status].name))
name = ''
else:
name = remote_name_req_compelte.remote_name
except TimeoutError as e:
name = ''

print(bd_addr+':', blue(name))
print("{} : {}".format(bd_addr, blue(name)))
except HciRuntimeError as e:
logger.error("{}".format(e))
except KeyboardInterrupt as e:
Expand All @@ -90,12 +95,12 @@ def inquiry_result_handler(result: bytes):


def scan_lmp_feature(self, paddr: str):
hci = HCI(self.iface)
hci = HCI(self.iface, HCI_CHANNEL_USER)
conn_complete = hci.create_connection(paddr, page_scan_repetition_mode = 0x02)

if conn_complete.status != ControllerErrorCodes.SUCCESS:
logger.error("Failed to connect {} BD/EDR address\n"
" status {} {}".format(
" connection complete status: 0x{:02x} {}".format(
paddr,
conn_complete.status, ControllerErrorCodes[conn_complete.status].name))
sys.exit(1)
Expand All @@ -115,9 +120,11 @@ def scan_lmp_feature(self, paddr: str):

read_remote_supported_features_complete = hci.read_remote_supported_features(conn_complete.conn_handle)
if read_remote_supported_features_complete.status != ControllerErrorCodes.SUCCESS:
logger.error('Failed to read remote supported features')
logger.error("Failed to read remote extented features\n"
" read remote extented features complete status: 0x{:02x} {}".format(
read_remote_ext_features_complete.status, ControllerErrorCodes[read_remote_ext_features_complete.status].name))
hci.disconnect(conn_complete.conn_handle)
exit(1)
sys.exit(1)

print(blue('LMP features'))
pp_lmp_features(read_remote_supported_features_complete.lmp_features)
Expand All @@ -127,14 +134,17 @@ def scan_lmp_feature(self, paddr: str):
sys.exit(1)

print(blue('Extended LMP features'))
# Get Max_Page_Number
read_remote_ext_features_complete = hci.read_remote_extended_features(conn_complete.conn_handle, 0x00)
if read_remote_ext_features_complete.status != ControllerErrorCodes.SUCCESS:
logger.error('Failed to read remote extented features')
logger.error("Failed to read remote extented features\n"
" read remote extented features complete status: 0x{:02x} {}".format(
read_remote_ext_features_complete.status, ControllerErrorCodes[read_remote_ext_features_complete.status].name))
hci.disconnect(conn_complete.conn_handle)
exit(1)

pp_ext_lmp_features(read_remote_ext_features_complete.ext_lmp_features, 0)
for i in range(1, read_remote_ext_features_complete.max_page_num+1):
sys.exit(1)
max_page_num = read_remote_ext_features_complete.max_page_num
for i in range(1, max_page_num+1):
read_remote_ext_features_complete_i = hci.read_remote_extended_features(conn_complete.conn_handle, i)
if read_remote_ext_features_complete_i.status != ControllerErrorCodes.SUCCESS:
logger.error('Failed to read remote extented features, page {}'.format(i))
Expand Down
19 changes: 11 additions & 8 deletions src/bluescan/gatt_scan.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3

import io
from multiprocessing.sharedctypes import Value
import pickle
import threading
import subprocess
Expand All @@ -9,6 +10,7 @@
from uuid import UUID

import pkg_resources
from bthci import ADDR_TYPE_PUBLIC, ADDR_TYPE_RANDOM
from pyclui import green, blue, yellow, red
from pyclui import Logger
from halo import Halo
Expand Down Expand Up @@ -323,7 +325,7 @@ def register_agent_error_callback(error):
reply_handler=register_agent_callback,
error_handler=register_agent_error_callback)

def scan(self, addr: str, addr_type: str) -> GattScanResult:
def scan(self, addr: str, addr_type: int = ADDR_TYPE_PUBLIC) -> GattScanResult:
logger.debug("Entered scan()")

try:
Expand All @@ -339,8 +341,14 @@ def run_mainloop():
self.result.addr_type = addr_type

if self.result.addr_type is None:
self.result.addr_type = self.determine_addr_type()

found_addr_type = self.determine_addr_type()
if found_addr_type == 'public':
self.result.addr_type = ADDR_TYPE_PUBLIC
elif found_addr_type == 'random':
self.result.addr_type = ADDR_TYPE_RANDOM
else:
raise ValueError("Unknown found_addr_type: {}".format(found_addr_type))

self.gatt_client = GattClient(self.iface)

logger.debug("Address: {}\n".format(self.result.addr) +
Expand Down Expand Up @@ -593,8 +601,3 @@ def determine_addr_type(self):
return dev_info.addr_type

raise RuntimeError("Couldn't determine the LE address type. Please provide it explicitly.")


if __name__ == '__main__':
result = GattScanner().scan('6F:45:66:76:41:12', 'random', True)
result.print()
Loading

0 comments on commit 28394c5

Please sign in to comment.