Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scanner force-scanning #511

Merged
merged 25 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d3be0ac
Fix scanner force-scanning
uzlonewolf Jun 23, 2024
d3bc3d0
Allow a string as the force-scan list
uzlonewolf Jun 23, 2024
d088823
Add force-scanning v3.5 devices to scanner
uzlonewolf Jun 23, 2024
e9fd881
Add force-scanning v3.5 devices to scanner
uzlonewolf Jun 23, 2024
0690e8c
v1.14.2 Notes
jasonacox Jun 23, 2024
7259b8d
Rename scanner variable try_v35 to a more descriptive try_v35_with_v34
uzlonewolf Jun 24, 2024
9420ec6
Make sure set_version() is given a float (#507)
uzlonewolf Jun 24, 2024
133fd35
Allow host bits in the force-scan network list
uzlonewolf Jun 25, 2024
da6d01b
Refactor _print_device_info function to include verbose flag
jasonacox Jun 25, 2024
51f7653
Merge branch 'forcescan-fix' of https://github.com/uzlonewolf/tinytuy…
jasonacox Jun 25, 2024
7d160a0
Allow device discovery packets on port 7000
uzlonewolf Jul 7, 2024
ff6abf2
Add get_ip_to_broadcast() function to scanner
uzlonewolf Jul 7, 2024
5f38b49
Add "Force Scan" button and UI updates
jasonacox Jul 7, 2024
59a013f
Merge branch 'forcescan-fix' of https://github.com/uzlonewolf/tinytuy…
jasonacox Jul 7, 2024
831f113
Make scanner send broadcasts to port 7000
uzlonewolf Jul 7, 2024
4d46538
Update control panel image
jasonacox Jul 7, 2024
6073a8c
Add port 7000 support to tools/pcap_parse.py
uzlonewolf Jul 7, 2024
3610184
Merge branch 'forcescan-fix' of github-lw:uzlonewolf/tinytuya into fo…
uzlonewolf Jul 7, 2024
a20b080
Fix incorrect message log level
uzlonewolf Jul 7, 2024
2fbb691
Allow calling send_discovery_request() without an argument
uzlonewolf Jul 7, 2024
0b19990
Rewrite tinytuya.find_device() to use the scanner
uzlonewolf Jul 7, 2024
c6dba30
Also use psutil to get the force-scan IP list if netifaces is not ava…
uzlonewolf Jul 7, 2024
b00882a
Close the broadcast sockets once we're done with them
uzlonewolf Jul 7, 2024
728779d
Add broadcast for 3.5 devices to server
jasonacox Jul 7, 2024
3aea261
Release notes
jasonacox Jul 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# RELEASE NOTES

## v1.14.1 - Scanner Fixes

* Fix force-scanning bug in scanner introduced in last release by @uzlonewolf in https://github.com/jasonacox/tinytuya/pull/511.

## v1.14.0 - Command Line Updates

* PyPI 1.14.0 rewrite of main to use argparse and add additional options by @uzlonewolf in https://github.com/jasonacox/tinytuya/pull/503
Expand Down
3 changes: 2 additions & 1 deletion tinytuya/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
# Colorama terminal color capability for all platforms
init()

version_tuple = (1, 14, 0)
version_tuple = (1, 14, 1)
version = __version__ = "%d.%d.%d" % version_tuple
__author__ = "jasonacox"

Expand Down Expand Up @@ -1654,6 +1654,7 @@ def add_dps_to_request(self, dp_indicies):
self.dps_to_request.update({str(index): None for index in dp_indicies})

def set_version(self, version): # pylint: disable=W0621
version = float(version)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call! Yes, this surfaces as an issue every 3-6mo or so.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought I'd sneak that one in there :) I'm not marking it as 'closes #507' as I think there's more we can do to type-safe everything.

self.version = version
self.version_str = "v" + str(version)
self.version_bytes = str(version).encode('latin1')
Expand Down
75 changes: 60 additions & 15 deletions tinytuya/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def __init__( self, ip, deviceinfo, options, debug ):
self.timeo = 0
self.resets = 0
self.step = FSCAN_NOT_STARTED
self.try_v35_with_v34 = False
self.cur_key = None
self.hard_time_limit = time.time() + 30
self.initial_connect_retries = options['retries']
Expand Down Expand Up @@ -275,6 +276,11 @@ def v34_negotiate_sess_key_start( self ):
print('v3.4/5 trying key', self.ip, self.device.real_local_key)
step1 = self.device._negotiate_session_key_generate_step_1()
self.sock.sendall( self.device._encode_message( step1 ) )
if self.try_v35_with_v34 and self.device.version == 3.4:
self.device.version = 3.5
step1 = self.device._negotiate_session_key_generate_step_1()
self.sock.sendall( self.device._encode_message( step1 ) )
self.device.version = 3.4
if self.debug:
print('v3.4/5 session key neg start, debug ip', self.ip)

Expand All @@ -297,6 +303,8 @@ def __init__( self, ip, deviceinfo, options, debug ):
self.retries = 0
self.keygen = None
self.brute_force_data = []
self.try_v35_with_v34 = True
self.v34_connect_ok = False

self.connect()

Expand Down Expand Up @@ -334,7 +342,7 @@ def timeout( self, forced=False ):
print('ForceScannedDevice: Debug sock', self.ip, 'connect timed out!')
elif self.step == FSCAN_INITIAL_CONNECT:
if self.debug:
print('ForceScannedDevice: Debug sock', self.ip, 'socket send failed,', 'no data received' if forced else 'receive timed out')
print('ForceScannedDevice: Debug sock', self.ip, 'socket send failed,', 'no data received,' if forced else 'receive timed out,', 'current retry:', self.retries)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good addition.

if self.retries < 2:
self.retries += 1
self.connect()
Expand Down Expand Up @@ -366,8 +374,9 @@ def timeout( self, forced=False ):
self.remove = True
else:
self.connect()
elif self.step == FSCAN_v34_BRUTE_FORCE_ACTIVE: # or self.step == FSCAN_v33_BRUTE_FORCE_ACTIVE or self.step == FSCAN_v31_BRUTE_FORCE_ACTIVE:
if not forced:
self.v34_connect_ok = False
elif self.step == FSCAN_v34_BRUTE_FORCE_ACTIVE:
if( (not forced) and (not self.v34_connect_ok) ):
# actual timeout, connect failed
if self.retries < 2:
self.retries += 1
Expand All @@ -382,6 +391,7 @@ def timeout( self, forced=False ):
return
# brute-forcing the key
self.v3x_brute_force_try_next_key()
self.v34_connect_ok = False
elif self.step == FSCAN_v31_BRUTE_FORCE_ACTIVE:
# brute-forcing the key
self.v3x_brute_force_try_next_key()
Expand Down Expand Up @@ -418,6 +428,8 @@ def write_data( self ):
# 'False' when connection was made but then closed
# The IP address when the connection is still open
addr = self.get_peer()
if self.debug:
print('ForceScannedDevice: device', self.ip, 'addr is:', addr)
if addr is None:
# refused
self.close()
Expand All @@ -435,6 +447,7 @@ def write_data( self ):
#self.timeo = time.time() + self.options['data_timeout']
self.timeo = time.time() + 1.5
self.found = True
self.v34_connect_ok = True

if len(self.send_queue) > 0:
self.sock.sendall( self.device._encode_message( self.send_queue[0] ) )
Expand Down Expand Up @@ -502,15 +515,31 @@ def read_data( self ):
if self.debug:
print('ForceScannedDevice:', self.ip, 'got step', self.step, 'data:', data )

if len(data) == 0:
if len(data) == 0:
self.timeout( True )
return

while len(data):
try:
prefix_offset = data.find(tinytuya.PREFIX_BIN)
if prefix_offset > 0:
data = data[prefix_offset:]
if self.deviceinfo['version'] == 3.5:
prefix_offset = data.find(tinytuya.PREFIX_6699_BIN)
if prefix_offset > 0:
data = data[prefix_offset:]
else:
prefix_offset = data.find(tinytuya.PREFIX_BIN)
if prefix_offset >= 0:
data = data[prefix_offset:]
self.try_v35_with_v34 = False
elif self.try_v35_with_v34 and self.deviceinfo['version'] == 3.4:
prefix_offset = data.find(tinytuya.PREFIX_6699_BIN)
if prefix_offset >= 0:
if self.debug:
print('ForceScannedDevice: device is v3.5!')
data = data[prefix_offset:]
self.try_v35_with_v34 = False
self.deviceinfo['version'] = 3.5
self.device.set_version(3.5)
self.ver_found = True
hmac_key = self.device.local_key if self.deviceinfo['version'] >= 3.4 else None
msg = tinytuya.unpack_message(data, hmac_key=hmac_key)
except:
Expand Down Expand Up @@ -668,7 +697,7 @@ def brute_force_v3x_data( self ):
self.brute_force_found_key()
return True

self.brute_force_data = []
self.brute_force_data = []
return False

def v3x_brute_force_try_next_key( self ):
Expand Down Expand Up @@ -794,7 +823,7 @@ def write_data( self ):
self.write = False

try:
# connected, send the query
# connected, send the query
if self.device.version >= 3.4 :
# self.device.real_local_key, self.device.local_key
self.v34_negotiate_sess_key_start()
Expand Down Expand Up @@ -899,7 +928,7 @@ def _generate_ip(networks, verbose, term):
if tinytuya.IS_PY2 and type(netblock) == str:
netblock = netblock.decode('latin1')
try:
network = ipaddress.ip_network(netblock)
network = ipaddress.ip_network(netblock, strict=False)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So easy. ;-) thanks! 🙏

log.debug("Starting brute force network scan %s", network)
except:
log.debug("Unable to get network for %r, ignoring", netblock)
Expand Down Expand Up @@ -1129,8 +1158,24 @@ def tuyaLookup(deviceid):
if verbose:
print(term.subbold + " Option: " + term.dim + "Network force scanning requested.\n")

fstype = type(forcescan)
if fstype != list and fstype != tuple:
# argparse gives us a list of lists
# the inner list is empty [[]] when no address specified
add_connected = True
if isinstance( forcescan, list ) or isinstance( forcescan, tuple ):
for ip in forcescan:
if isinstance( ip, list ) or isinstance( ip, tuple ):
for ip2 in ip:
networks.append( ip2 )
add_connected = False
else:
networks.append( ip )
add_connected = False

if isinstance( forcescan, str ) or isinstance( forcescan, bytes ):
networks.append( forcescan )
add_connected = False

if add_connected:
if not NETIFLIBS:
print(term.alert +
' NOTE: netifaces module not available, multi-interface machines will be limited.\n'
Expand All @@ -1151,9 +1196,6 @@ def tuyaLookup(deviceid):
if not networks:
print(term.alert + 'No networks to force-scan, exiting.' + term.normal)
return None
else:
for ip in forcescan:
networks.append( ip )

if snapshot:
for ip in snapshot:
Expand All @@ -1162,6 +1204,9 @@ def tuyaLookup(deviceid):
snapshot = []

if networks:
if verbose:
log.debug("Force-scanning networks: %r", networks)

scan_ips = _generate_ip( networks, verbose, term )
ip_scan = ip_scan_running = True
if discover:
Expand Down
Loading