Skip to content

Commit

Permalink
utils: add get_unassigned_devices (#15)
Browse files Browse the repository at this point in the history
List all unassigned devices.

This function will be used inside ns.dedalo API and can be used in the future for the interfaces APIs
  • Loading branch information
gsanchietti authored Oct 18, 2023
1 parent c70a0f9 commit 83eebeb
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 1 deletion.
53 changes: 53 additions & 0 deletions src/nethsec/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,59 @@ def get_group_macs(uci, group):
macs = macs + get_user_macs(uci, u)
return macs

def get_unassigned_devices(uci):
'''
Retrieve all unused/unassigned devices.
Arguments:
- uci -- EUci pointer
Returns:
- A list of devices
'''
unassigned = []
try:
p = subprocess.run(["ip", "-j", "link"], check=True, capture_output=True, text=True)
devices = json.loads(p.stdout)
except:
return []

u_interfaces = get_all_by_type(uci, 'network', 'interface')
u_devices = get_all_by_type(uci, 'network', 'device')

for ip_device in devices:
free = True
ifname = ip_device.get('ifname', '')
# exclude special devices
if not ifname or ifname == "lo" or ifname.startswith("ifb-"):
continue
# search among UCI devices
for d in u_devices:
# ports are present on bridge devices
try:
ports = uci.get_all('network', d, 'ports')
except:
ports = []
if uci.get('network', d, 'name', default="") == ifname or ifname in ports:
free = False
# search among UCI interfaces
for i in u_interfaces:
# slaves are present on bond devices
slaves = []
if uci.get('network', i, 'proto', default='') == 'bonding':
slaves = list(uci.get_all('network', i, 'slaves'))
# for bonds, the section name is the nmame of device with 'bond-' prefix
slaves.append(f'bond-{i}')
device = uci.get('network', i, 'device', default="")
if device == ifname or ifname in slaves:
free = False
# search inside hotspot configuration
if uci.get('dedalo', 'config', 'disabled', default="1") == "0" and uci.get('dedalo', 'config', 'interface', default="") == ifname:
free = False
if free:
unassigned.append(ifname)
return unassigned


def validation_errors(errors):
'''
Expand Down
34 changes: 33 additions & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from unittest.mock import MagicMock, patch

# Setup fake ip command output
ip_json='[{"ifindex":9,"ifname":"vnet3","flags":["BROADCAST","MULTICAST","UP","LOWER_UP"],"mtu":1500,"qdisc":"noqueue","master":"virbr2","operstate":"UNKNOWN","group":"default","txqlen":1000,"link_type":"ether","address":"fe:62:31:19:0b:29","broadcast":"ff:ff:ff:ff:ff:ff","addr_info":[{"family":"inet6","local":"fe80::fc62:31ff:fe19:b29","prefixlen":64,"scope":"link","valid_life_time":4294967295,"preferred_life_time":4294967295}]}]'
ip_json='[{"ifindex":9,"ifname":"vnet3","flags":["BROADCAST","MULTICAST","UP","LOWER_UP"],"mtu":1500,"qdisc":"noqueue","master":"virbr2","operstate":"UNKNOWN","group":"default","txqlen":1000,"link_type":"ether","address":"fe:62:31:19:0b:29","broadcast":"ff:ff:ff:ff:ff:ff","addr_info":[{"family":"inet6","local":"fe80::fc62:31ff:fe19:b29","prefixlen":64,"scope":"link","valid_life_time":4294967295,"preferred_life_time":4294967295}]},{"ifindex":2,"ifname":"eth0","flags":["BROADCAST","MULTICAST","UP","LOWER_UP"],"mtu":1500,"qdisc":"fq_codel","master":"br-lan","operstate":"UP","linkmode":"DEFAULT","group":"default","txqlen":1000,"link_type":"ether","address":"52:54:00:6a:50:bf","broadcast":"ff:ff:ff:ff:ff:ff"},{"ifindex":3,"ifname":"eth1","flags":["BROADCAST","MULTICAST","UP","LOWER_UP"],"mtu":1500,"qdisc":"fq_codel","operstate":"UP","linkmode":"DEFAULT","group":"default","txqlen":1000,"link_type":"ether","address":"52:54:00:20:82:a6","broadcast":"ff:ff:ff:ff:ff:ff"},{"ifindex":4,"ifname":"eth2","flags":["BROADCAST","MULTICAST"],"mtu":1500,"qdisc":"noop","operstate":"DOWN","linkmode":"DEFAULT","group":"default","txqlen":1000,"link_type":"ether","address":"52:54:00:75:1c:c1","broadcast":"ff:ff:ff:ff:ff:ff"},{"ifindex":5,"ifname":"eth3","flags":["BROADCAST","MULTICAST"],"mtu":1500,"qdisc":"noop","operstate":"DOWN","linkmode":"DEFAULT","group":"default","txqlen":1000,"link_type":"ether","address":"52:54:00:ad:6f:63","broadcast":"ff:ff:ff:ff:ff:ff"},{"ifindex":6,"ifname":"ifb-dns","flags":["BROADCAST","NOARP","UP","LOWER_UP"],"mtu":1500,"qdisc":"fq_codel","operstate":"UNKNOWN","linkmode":"DEFAULT","group":"default","txqlen":32,"link_type":"ether","address":"72:79:65:12:07:07","broadcast":"ff:ff:ff:ff:ff:ff"},{"ifindex":7,"ifname":"br-lan","flags":["BROADCAST","MULTICAST","UP","LOWER_UP"],"mtu":1500,"qdisc":"noqueue","operstate":"UP","linkmode":"DEFAULT","group":"default","txqlen":1000,"link_type":"ether","address":"52:54:00:6a:50:bf","broadcast":"ff:ff:ff:ff:ff:ff"},{"ifindex":9,"ifname":"bond-bond1","flags":["BROADCAST","MULTICAST","MASTER","UP","LOWER_UP"],"mtu":1500,"qdisc":"noqueue","operstate":"UP","linkmode":"DEFAULT","group":"default","txqlen":1000,"link_type":"ether","address":"52:54:00:ad:6f:63","broadcast":"ff:ff:ff:ff:ff:ff"}]'
mock_ip_stdout = MagicMock()
mock_ip_stdout.configure_mock(**{"stdout": ip_json})

Expand Down Expand Up @@ -48,6 +48,11 @@
option device 'vnet3'
option proto 'static'
config device
option name 'br-lan'
option type 'bridge'
list ports 'eth0'
config device
option type '8021q'
option ifname 'eth2'
Expand All @@ -61,6 +66,17 @@
config interface 'wan6'
option device 'eth1'
option proto 'dhcpv6'
config interface 'bond1'
option proto 'bonding'
option ipaddr '10.0.0.22'
option netmask '255.255.255.0'
list slaves 'eth3'
option bonding_policy 'balance-rr'
option packets_per_slave '1'
option all_slaves_active '0'
option link_monitoring 'off'
"""

objects_db = """
Expand Down Expand Up @@ -104,6 +120,12 @@
option enabled '1'
"""

dedalo_db = """
config dedalo 'config'
option disabled '0'
option interface 'eth2'
"""

def _setup_db(tmp_path):
# setup fake db
with tmp_path.joinpath('test').open('w') as fp:
Expand All @@ -118,6 +140,8 @@ def _setup_db(tmp_path):
fp.write(dhcp_db)
with tmp_path.joinpath('openvpn').open('w') as fp:
fp.write(openvpn_db)
with tmp_path.joinpath('dedalo').open('w') as fp:
fp.write(dedalo_db)
return EUci(confdir=tmp_path.as_posix())

def test_sanitize():
Expand Down Expand Up @@ -238,3 +262,11 @@ def test_validation_errors():
["param2", " invalid_value ", False],
])
assert er1 == {"validation": {"errors": [{"parameter": "param1", "message": "my_error", "value": "val1"}, {"parameter": "param2", "message": "invalid_value", "value": False}]}}

@patch("nethsec.utils.subprocess.run")
def test_get_unassigned_devices(mock_run_ip, tmp_path):
# setup mock
mock_run_ip.return_value = mock_ip_stdout

u = _setup_db(tmp_path)
assert utils.get_unassigned_devices(u) == []

0 comments on commit 83eebeb

Please sign in to comment.