diff --git a/providers/base/bin/gateway_ping_test.py b/providers/base/bin/gateway_ping_test.py index 33bcc64cf..c3991e404 100755 --- a/providers/base/bin/gateway_ping_test.py +++ b/providers/base/bin/gateway_ping_test.py @@ -38,6 +38,7 @@ import time from contextlib import suppress +from typing import Dict class Route: @@ -103,33 +104,10 @@ def _get_default_gateway_from_proc(self): def _get_default_gateway_from_bin_route(self): """ - Get default gateway from /sbin/route -n - Called by get_default_gateway - and is only used if could not get that from /proc + Return the gateway for the interface associated with this Route object. """ - logging.debug( - _("Reading default gateway information from route binary") - ) - try: - routebin = subprocess.check_output( - ["/usr/bin/env", "route", "-n"], - env={"LANGUAGE": "C"}, - universal_newlines=True, - ) - except subprocess.CalledProcessError: - return None - route_line_re = re.compile( - r"^0\.0\.0\.0\s+(?P[\w.]+)(?P.+)", - flags=re.MULTILINE, - ) - route_lines = route_line_re.finditer(routebin) - for route_line in route_lines: - def_gateway = route_line.group("def_gateway") - interface = route_line.group("tail").rsplit(" ", 1)[-1] - if interface == self.interface and def_gateway: - return def_gateway - logging.error(_("Could not find default gateway by running route")) - return None + default_gws = get_default_gateways() + return default_gws.get(self.interface) def _get_ip_addr_info(self): return subprocess.check_output( @@ -192,8 +170,8 @@ def get_default_gateways(self) -> set: ) return def_gateways - @classmethod - def get_interface_from_ip(cls, ip): + @staticmethod + def get_interface_from_ip(ip): # Note: this uses -o instead of -j for xenial/bionic compatibility route_info = subprocess.check_output( ["ip", "-o", "route", "get", ip], universal_newlines=True @@ -207,8 +185,8 @@ def get_interface_from_ip(cls, ip): "Unable to determine any device used for {}".format(ip) ) - @classmethod - def get_any_interface(cls): + @staticmethod + def get_any_interface(): # Note: this uses -o instead of -j for xenial/bionic compatibility route_infos = subprocess.check_output( ["ip", "-o", "route", "show", "default", "0.0.0.0/0"], @@ -220,8 +198,8 @@ def get_any_interface(cls): return route_info_fields[4] raise ValueError("Unable to determine any valid interface") - @classmethod - def from_ip(cls, ip: str): + @staticmethod + def from_ip(ip: str): """ Build an instance of Route given an ip, if no ip is provided the best interface that can route to 0.0.0.0/0 is selected (as described by @@ -233,11 +211,11 @@ def from_ip(cls, ip: str): return Route(Route.get_any_interface()) -def is_reachable(ip, interface, verbose=False): +def is_reachable(ip, interface): """ Ping an ip to see if it is reachable """ - result = ping(ip, interface, 3, 10, verbose=verbose) + result = ping(ip, interface) return result["transmitted"] >= result["received"] > 0 @@ -259,7 +237,7 @@ def get_default_gateway_reachable_on(interface: str) -> str: ) -def get_any_host_reachable_on(interface: str, verbose=False) -> str: +def get_any_host_reachable_on(interface: str) -> str: """ Returns any host that it can reach from a given interface """ @@ -273,7 +251,7 @@ def get_any_host_reachable_on(interface: str, verbose=False) -> str: ) # retry a few times to get something in the arp table for i in range(10): - ping(broadcast, interface, 1, 1, broadcast=True, verbose=verbose) + ping(broadcast, interface, broadcast=True) # Get output from arp -a -n to get known IPs arp_table = subprocess.check_output( ["arp", "-a", "-n"], universal_newlines=True @@ -296,9 +274,7 @@ def get_any_host_reachable_on(interface: str, verbose=False) -> str: ) -def get_host_to_ping( - interface: str, target: str = None, verbose=False -) -> "str|None": +def get_host_to_ping(interface: str, target: str = None) -> "str|None": """ Attempts to determine a reachable host to ping on the specified network interface. First it tries to ping the provided target. If no target is @@ -331,10 +307,9 @@ def get_host_to_ping( def ping( host: str, interface: "str|None", - count: int, - deadline: int, + count: int = 2, + deadline: int = 4, broadcast=False, - verbose=False, ): """ pings an host via an interface count times within the given deadline. @@ -373,8 +348,8 @@ def ping( str(e), e.stdout, e.stderr ) return ping_summary - if verbose: - print(output) + + print(output) try: received = next(re.finditer(reg, output)) ping_summary = { @@ -392,8 +367,6 @@ def ping( def parse_args(argv): - default_count = 2 - default_delay = 4 parser = argparse.ArgumentParser() parser.add_argument( "host", @@ -401,68 +374,21 @@ def parse_args(argv): default=None, help=_("host to ping"), ) - parser.add_argument( - "-c", - "--count", - default=default_count, - type=int, - help=_("number of packets to send"), - ) - parser.add_argument( - "-d", - "--deadline", - default=default_delay, - type=int, - help=_("timeout in seconds"), - ) - parser.add_argument( - "-t", - "--threshold", - default=0, - type=int, - help=_("allowed packet loss percentage (default: %(default)s)"), + iface_mutex_group = parser.add_mutually_exclusive_group() + iface_mutex_group.add_argument( + "-I", + "--interface", + help=_("use specified interface to send packets"), + action="append", + dest="interfaces", + default=[None], ) - parser.add_argument( - "-v", "--verbose", action="store_true", help=_("be verbose") - ) - parser.add_argument( - "-I", "--interface", help=_("use specified interface to send packets") + iface_mutex_group.add_argument( + "--any-cable-interface", + help=_("use any cable interface to send packets"), + action="store_true", ) - args = parser.parse_args(argv) - # Ensure count and deadline make sense. Adjust them if not. - if args.deadline != default_delay and args.count != default_count: - # Ensure they're both consistent, and exit with a warning if not, - # rather than modifying what the user explicitly set. - if args.deadline <= args.count: - # FIXME: this cannot ever be translated correctly - raise SystemExit( - _( - "ERROR: not enough time for {0} pings in {1} seconds" - ).format(args.count, args.deadline) - ) - elif args.deadline != default_delay: - # Adjust count according to delay. - args.count = args.deadline - 1 - if args.count < 1: - args.count = 1 - if args.verbose: - # FIXME: this cannot ever be translated correctly - print( - _( - "Adjusting ping count to {0} to fit in {1}-second deadline" - ).format(args.count, args.deadline) - ) - else: - # Adjust delay according to count - args.deadline = args.count + 1 - if args.verbose: - # FIXME: this cannot ever be translated correctly - print( - _("Adjusting deadline to {0} seconds to fit {1} pings").format( - args.deadline, args.count - ) - ) - return args + return parser.parse_args(argv) def main(argv) -> int: @@ -474,17 +400,18 @@ def main(argv) -> int: args = parse_args(argv) + if args.any_cable_interface: + print(_("Looking for all cable interfaces...")) + all_ifaces = get_default_gateways().keys() + args.interfaces = list(filter(is_cable_interface, all_ifaces)) + # If given host is not pingable, override with something pingable. - host = get_host_to_ping( - interface=args.interface, verbose=args.verbose, target=args.host - ) - if args.verbose: - print(_("Checking connectivity to {0}").format(host)) + host = get_host_to_ping(interface=args.interfaces[0], target=args.host) + + print(_("Checking connectivity to {0}").format(host)) if host: - ping_summary = ping( - host, args.interface, args.count, args.deadline, args.verbose - ) + ping_summary = ping(host, args.interfaces[0]) else: ping_summary = { "received": 0, @@ -492,34 +419,53 @@ def main(argv) -> int: } if ping_summary["received"] == 0: - print(_("No Internet connection")) + print(_("FAIL: All packet loss.")) if ping_summary.get("cause"): print("Possible cause: {}".format(ping_summary["cause"])) return 1 elif ping_summary["transmitted"] != ping_summary["received"]: - print( - _("Connection established, but lost {0}% of packets").format( - ping_summary["pct_loss"] - ) - ) - if ping_summary["pct_loss"] > args.threshold: - print( - _( - "FAIL: {0}% packet loss is higher than {1}% threshold" - ).format(ping_summary["pct_loss"], args.threshold) - ) - return 1 - else: - print( - _("PASS: {0}% packet loss is within {1}% threshold").format( - ping_summary["pct_loss"], args.threshold - ) - ) - return 0 + print(_("FAIL: {0}% packet loss.").format(ping_summary["pct_loss"])) + return 1 else: - print(_("Connection to test host fully established")) + print(_("PASS: 0% packet loss").format(host)) return 0 +def get_default_gateways() -> Dict[str, str]: + """ + Use `route` program to find default gateways for all interfaces. + + returns a dictionary in a form of {interface_name: gateway} + """ + try: + routes = subprocess.check_output( + ["route", "-n"], universal_newlines=True + ) + except subprocess.CalledProcessError as exc: + logging.debug("Failed to run `route -n `", exc) + return {} + regex = r"^0\.0\.0\.0\s+(?P[\w.]+)\s.*\s(?P[\w.]+)$" + matches = re.finditer(regex, routes, re.MULTILINE) + + return {m.group("interface"): m.group("gw") for m in matches} + + +def is_cable_interface(interface: str) -> bool: + """ + Check if the interface is a cable interface. + This is a simple heuristic that checks if the interface is named + "enX" or "ethX" where X is a number. + + :param interface: the interface name to check + :return: True if the interface is a cable interface, False otherwise + + Looking at the `man 7 systemd.net-naming-scheme` we can see that + even the `eth` matching may be an overkill. + """ + if not isinstance(interface, str) or not interface: + return False + return interface.startswith("en") or interface.startswith("eth") + + if __name__ == "__main__": sys.exit(main(sys.argv[1:])) diff --git a/providers/base/bin/wifi_client_test_netplan.py b/providers/base/bin/wifi_client_test_netplan.py index f31448045..7a4f408fb 100755 --- a/providers/base/bin/wifi_client_test_netplan.py +++ b/providers/base/bin/wifi_client_test_netplan.py @@ -256,7 +256,7 @@ def perform_ping_test(interface): if target: count = 5 - result = ping(target, interface, count, 10, verbose=True) + result = ping(target, interface, count, 10) if result['received'] == count: return True diff --git a/providers/base/bin/wifi_nmcli_test.py b/providers/base/bin/wifi_nmcli_test.py index 91000bbaf..6f75bdfd4 100755 --- a/providers/base/bin/wifi_nmcli_test.py +++ b/providers/base/bin/wifi_nmcli_test.py @@ -119,7 +119,7 @@ def perform_ping_test(interface): if target: count = 5 - result = ping(target, interface, count, 10, verbose=True) + result = ping(target, interface, count, 10) if result['received'] == count: return True diff --git a/providers/base/tests/test_gateway_ping_test.py b/providers/base/tests/test_gateway_ping_test.py index ecebc6a2b..690b7cf93 100644 --- a/providers/base/tests/test_gateway_ping_test.py +++ b/providers/base/tests/test_gateway_ping_test.py @@ -11,6 +11,8 @@ get_default_gateway_reachable_on, get_any_host_reachable_on, get_host_to_ping, + get_default_gateways, + is_cable_interface, ) @@ -46,9 +48,7 @@ def test__get_default_gateway_from_ip_invalid_route( self.assertIsNone(Route._get_default_gateway_from_ip(self_mock)) @patch("subprocess.check_output") - def test__get_default_gateway_from_ip_crash( - self, mock_check_output - ): + def test__get_default_gateway_from_ip_crash(self, mock_check_output): mock_check_output.side_effect = subprocess.CalledProcessError(1, "") self_mock = MagicMock() self_mock.interface = "eth0" @@ -92,35 +92,33 @@ def _num_to_dotted_quad(x): Route._get_default_gateway_from_proc(self_mock), ) - @patch("subprocess.check_output") - def test__get_default_gateway_from_bin_route_nominal( - self, mock_check_output - ): - mock_check_output.return_value = textwrap.dedent( - """ - Kernel IP routing table - Destination Gateway Genmask Flags Metric Ref Use Iface - 0.0.0.0 192.168.1.1 0.0.0.0 UG 100 0 0 enp5s0 - 0.0.0.0 192.168.1.100 0.0.0.0 UG 600 0 0 wlan0 - """ - ) + @patch("gateway_ping_test.get_default_gateways") + def test__get_default_gateway_from_bin_route_nominal(self, mock_get_d_gws): + mock_get_d_gws.return_value = { + "enp5s0": "192.168.1.1", + "wlan0": "192.168.1.100", + } self_mock = MagicMock() self_mock.interface = "wlan0" gateway = Route._get_default_gateway_from_bin_route(self_mock) self.assertEqual(gateway, "192.168.1.100") - @patch("subprocess.check_output") + @patch("gateway_ping_test.get_default_gateways") def test__get_default_gateway_from_bin_route_if_not_found( - self, mock_check_output + self, mock_get_d_gws ): - mock_check_output.return_value = textwrap.dedent( - """ - Kernel IP routing table - Destination Gateway Genmask Flags Metric Ref Use Iface - 0.0.0.0 192.168.1.1 0.0.0.0 UG 100 0 0 enp5s0 - 0.0.0.0 192.168.1.100 0.0.0.0 UG 600 0 0 wlan0 - """ - ) + mock_get_d_gws.return_value = { + "enp5s0": "192.168.1.1", + "wlan0": "192.168.1.100", + } + self_mock = MagicMock() + self_mock.interface = "enp1s0" + gateway = Route._get_default_gateway_from_bin_route(self_mock) + self.assertIsNone(gateway) + + @patch("gateway_ping_test.get_default_gateways") + def test__get_default_gateway_from_bin_route_empty(self, mock_get_d_gws): + mock_get_d_gws.return_value = {} self_mock = MagicMock() self_mock.interface = "enp1s0" gateway = Route._get_default_gateway_from_bin_route(self_mock) @@ -284,6 +282,14 @@ def test_from_ip_none(self, mock_get_interface_from_ip): self.assertEqual(Route.from_ip(None).interface, "enp6s0") self.assertTrue(mock_get_interface_from_ip.called) + @patch("subprocess.check_output", return_value="") + def test_get_ip_addr_info(self, mock_check_output): + self_mock = MagicMock() + self.assertEqual(Route._get_ip_addr_info(self_mock), "") + mock_check_output.assert_called_once_with( + ["ip", "-o", "addr", "show"], universal_newlines=True + ) + class TestUtilityFunctions(unittest.TestCase): @patch("gateway_ping_test.ping") @@ -455,7 +461,7 @@ def test_ping_ok(self, mock_check_output): mock_check_output.return_value = ( "4 packets transmitted, 4 received, 0% packet loss" ) - result = ping("8.8.8.8", "eth0", 4, 5, verbose=True) + result = ping("8.8.8.8", "eth0") self.assertEqual(result["transmitted"], 4) self.assertEqual(result["received"], 4) self.assertEqual(result["pct_loss"], 0) @@ -463,13 +469,13 @@ def test_ping_ok(self, mock_check_output): @patch("subprocess.check_output") def test_ping_malformed_output(self, mock_check_output): mock_check_output.return_value = "Malformed output" - result = ping("8.8.8.8", "eth0", 4, 5, verbose=True) + result = ping("8.8.8.8", "eth0") self.assertIn("Failed to parse", result["cause"]) @patch("subprocess.check_output") def test_ping_no_ping(self, mock_check_output): mock_check_output.side_effect = FileNotFoundError("ping not found") - result = ping("8.8.8.8", "eth0", 4, 5, verbose=True) + result = ping("8.8.8.8", "eth0") self.assertEqual(result["cause"], str(mock_check_output.side_effect)) @patch("subprocess.check_output") @@ -479,7 +485,7 @@ def test_ping_failure(self, mock_check_output): 1, "ping", "ping: unknown host" ) ) - result = ping("invalid.host", None, 4, 5) + result = ping("invalid.host", None) # Since the function does not return a detailed error for general # failures, we just check for non-success self.assertNotEqual( @@ -494,7 +500,7 @@ def test_ping_failure_broadcast(self, mock_check_output): 1, "ping", stderr="SO_BINDTODEVICE: Operation not permitted" ) ) - result = ping("255.255.255.255", None, 4, 5, broadcast=True) + result = ping("255.255.255.255", None, broadcast=True) self.assertIsNone(result) @@ -503,9 +509,19 @@ class TestMainFunction(unittest.TestCase): @patch("gateway_ping_test.ping") def test_no_internet_connection_no_cause( self, mock_ping, mock_get_host_to_ping + ): + mock_get_host_to_ping.return_value = "1.1.1.1" + mock_ping.return_value = {"received": 0} + result = main(["1.1.1.1"]) + self.assertEqual(result, 1) + + @patch("gateway_ping_test.get_host_to_ping") + @patch("gateway_ping_test.ping") + def test_no_internet_connection_auto_cause( + self, mock_ping, mock_get_host_to_ping ): mock_get_host_to_ping.return_value = None - mock_ping.return_value = None + mock_ping.return_value = {"received": 0} result = main(["1.1.1.1"]) self.assertEqual(result, 1) @@ -520,28 +536,27 @@ def test_no_internet_connection_cause( @patch("gateway_ping_test.get_host_to_ping") @patch("gateway_ping_test.ping") - def test_packet_loss_within_threshold( + def test_spotty_connection_with_cause( self, mock_ping, mock_get_host_to_ping ): mock_ping.return_value = { - "transmitted": 100, - "received": 95, - "pct_loss": 5, + "received": 1, + "transmitted": 2, + "pct_loss": 50, + "cause": "Test cause", } - result = main(["1.1.1.1", "-t", "10"]) - self.assertEqual(result, 0) + result = main(["1.1.1.1"]) + self.assertEqual(result, 1) @patch("gateway_ping_test.get_host_to_ping") @patch("gateway_ping_test.ping") - def test_packet_loss_exceeding_threshold( - self, mock_ping, mock_get_host_to_ping - ): + def test_some_packet_loss(self, mock_ping, mock_get_host_to_ping): mock_ping.return_value = { "transmitted": 100, - "received": 80, - "pct_loss": 20, + "received": 95, + "pct_loss": 5, } - result = main(["1.1.1.1", "-t", "10"]) + result = main(["1.1.1.1"]) self.assertEqual(result, 1) @patch("gateway_ping_test.get_host_to_ping") @@ -555,30 +570,80 @@ def test_full_connectivity(self, mock_ping, mock_get_host_to_ping): result = main(["1.1.1.1"]) self.assertEqual(result, 0) - @patch("gateway_ping_test.get_host_to_ping") + @patch("gateway_ping_test.is_reachable", return_value=True) + @patch("gateway_ping_test.get_default_gateways") @patch("gateway_ping_test.ping") - def test_verbose_output(self, mock_ping, mock_get_host_to_ping): - mock_ping.return_value = { - "transmitted": 100, - "received": 100, - "pct_loss": 0, + def test_main_any_cable(self, mock_ping, mock_get_default_gateways, _): + mock_get_default_gateways.return_value = { + "enp5s0": "192.168.1.1", + "wlan0": "192.168.1.2", } - result = main(["1.1.1.1", "-v"]) - self.assertEqual(result, 0) + main(["--any-cable-interface"]) + mock_ping.assert_called_once_with("192.168.1.1", "enp5s0") - @patch("gateway_ping_test.get_host_to_ping") - @patch("gateway_ping_test.ping") - def test_invalid_arguments_count_deadline( - self, mock_ping, mock_get_host_to_ping - ): - with self.assertRaises(SystemExit): - main(["-c", "10", "-d", "8"]) - def test_adjust_count_based_on_non_default_deadline(self): - # Assuming default_delay is 4 - args = parse_args(["-d", "1", "-v"]) - self.assertEqual( - args.count, - 1, - "Count should be adjusted based on the non-default deadline", +class GetDefaultGatewaysTests(unittest.TestCase): + @patch("subprocess.check_output") + def test_get_default_gateways_nominal(self, mock_check_output): + mock_check_output.return_value = textwrap.dedent( + """ + Kernel IP routing table + Destination Gateway Genmask Flags Metric Ref Use Iface + 0.0.0.0 192.168.1.1 0.0.0.0 UG 100 0 0 enp5s0 + 0.0.0.0 192.168.1.100 0.0.0.0 UG 600 0 0 wlan0 + """ ) + gateways = get_default_gateways() + self.assertDictEqual( + gateways, {"enp5s0": "192.168.1.1", "wlan0": "192.168.1.100"} + ) + + @patch("subprocess.check_output") + def test_get_default_gateways_no_default(self, mock_check_output): + mock_check_output.return_value = textwrap.dedent( + """ + Kernel IP routing table + Destination Gateway Genmask Flags Metric Ref Use Iface + 192.168.1.0 192.168.1.1 0.0.0.0 UG 100 0 0 enp5s0 + """ + ) + gateways = get_default_gateways() + self.assertDictEqual(gateways, {}) + + @patch("subprocess.check_output") + def test_get_default_gateways_cant_run_route(self, mock_check_output): + mock_check_output.side_effect = subprocess.CalledProcessError(1, "") + gateways = get_default_gateways() + self.assertDictEqual(gateways, {}) + + +class IsCableInterfaceTests(unittest.TestCase): + def test_is_cable_interface_nominal(self): + self.assertTrue(is_cable_interface("eth0")) + + def test_is_cable_interface_nominal_2(self): + self.assertTrue(is_cable_interface("enp5s0")) + + def test_is_cable_interface_nominal_3(self): + self.assertTrue(is_cable_interface("enp0s25")) + + def test_is_cable_interface_nope(self): + self.assertFalse(is_cable_interface("wlan0")) + + def test_is_cable_interface_nope_2(self): + self.assertFalse(is_cable_interface("wlp3s0")) + + def test_is_cable_interface_nope_3(self): + self.assertFalse(is_cable_interface("wwan0")) + + def test_is_cable_interface_nope_4(self): + self.assertFalse(is_cable_interface("tun0")) + + def test_is_cable_interface_nope_5(self): + self.assertFalse(is_cable_interface("lo")) + + def test_is_cable_interface_empty(self): + self.assertFalse(is_cable_interface("")) + + def test_is_cable_interface_not_string(self): + self.assertFalse(is_cable_interface(123)) diff --git a/providers/base/units/ethernet/jobs.pxu b/providers/base/units/ethernet/jobs.pxu index eaa5e2968..16ec79b46 100644 --- a/providers/base/units/ethernet/jobs.pxu +++ b/providers/base/units/ethernet/jobs.pxu @@ -206,7 +206,7 @@ _summary: Can ping another machine over Ethernet port {interface} _description: Check Ethernet works by pinging another machine plugin: shell command: - gateway_ping_test.py -v --interface {interface} + gateway_ping_test.py --interface {interface} category_id: com.canonical.plainbox::ethernet estimated_duration: 4.0 flags: preserve-locale also-after-suspend @@ -299,7 +299,7 @@ _steps: 2. Follow the instructions on the screen. plugin: user-interact command: - eth_hotplugging.py {{ interface }} && gateway_ping_test.py -v --interface {{ interface }} + eth_hotplugging.py {{ interface }} && gateway_ping_test.py --interface {{ interface }} category_id: com.canonical.plainbox::ethernet estimated_duration: 60.0 user: root