diff --git a/test/case/ietf_interfaces/iface_phys_address/test.py b/test/case/ietf_interfaces/iface_phys_address/test.py index 8d5147b64..e8dcbb8de 100755 --- a/test/case/ietf_interfaces/iface_phys_address/test.py +++ b/test/case/ietf_interfaces/iface_phys_address/test.py @@ -30,7 +30,7 @@ def calc_mac(base_mac, mac_offset): def reset_mac(tgt, port, mac): """Reset DUT interface MAC address to default.""" node = "infix-interfaces:custom-phys-address" - xpath = iface.get_iface_xpath(port, node) + xpath = iface.get_xpath(port, node) tgt.delete_xpath(xpath) with test.step("Verify target:data MAC address is reset to default"): until(lambda: iface.get_phys_address(tgt, tport) == mac) diff --git a/test/case/ietf_interfaces/verify_all_interface_types/test.py b/test/case/ietf_interfaces/verify_all_interface_types/test.py index e00585b43..c10508fd1 100755 --- a/test/case/ietf_interfaces/verify_all_interface_types/test.py +++ b/test/case/ietf_interfaces/verify_all_interface_types/test.py @@ -22,10 +22,10 @@ def verify_interface(target, interface, expected_type): - assert iface.interface_exist(target, interface), f"Interface <{interface}> does not exist." + assert iface.exist(target, interface), f"Interface <{interface}> does not exist." expected_type = f"infix-if-type:{expected_type}" - actual_type = iface._iface_get_param(target, interface, "type") + actual_type = iface.get_param(target, interface, "type") if expected_type == "infix-if-type:etherlike" and actual_type == "infix-if-type:ethernet": return # Allow 'etherlike' to match 'ethernet' diff --git a/test/case/ietf_interfaces/veth_delete/test.py b/test/case/ietf_interfaces/veth_delete/test.py index 047a83cc9..366cb344d 100755 --- a/test/case/ietf_interfaces/veth_delete/test.py +++ b/test/case/ietf_interfaces/veth_delete/test.py @@ -51,9 +51,9 @@ }) with test.step("Verify interfaces 'veth0a' and 'veth0b' exists"): - assert iface.interface_exist(target, veth0a), \ + assert iface.exist(target, veth0a), \ f"Interface <{veth0a}> does not exist." - assert iface.interface_exist(target, veth0b), \ + assert iface.exist(target, veth0b), \ f"Interface <{veth0b}> does not exist." with test.step("Set IP address on target:eth0 (dummy op)"): @@ -97,15 +97,15 @@ target = env.attach("target", "mgmt") with test.step("Verify target:eth0 and target:eth1 still exist"): - assert iface.interface_exist(target, eth0), \ + assert iface.exist(target, eth0), \ f"Interface {eth0} missing!" - assert iface.interface_exist(target, eth1), \ + assert iface.exist(target, eth1), \ f"Interface {eth1} missing!" with test.step("Verify VETH pair have been removed"): - assert not iface.interface_exist(target, veth0a), \ + assert not iface.exist(target, veth0a), \ f"Interface <{veth0a}> still exists!" - assert not iface.interface_exist(target, veth0b), \ + assert not iface.exist(target, veth0b), \ f"Interface <{veth0b}> still exists!" test.succeed() diff --git a/test/case/ietf_interfaces/vlan_ping/test.py b/test/case/ietf_interfaces/vlan_ping/test.py index 8220d3ded..8639f332f 100755 --- a/test/case/ietf_interfaces/vlan_ping/test.py +++ b/test/case/ietf_interfaces/vlan_ping/test.py @@ -4,26 +4,31 @@ Very basic test if the VLAN interface configuration works. """ + import infamy import infamy.iface as iface -import copy from infamy import until def test_ping(hport, should_pass): - with infamy.IsolatedMacVlan(hport) as ns: - pingtest = ns.runsh(""" - set -ex + with infamy.IsolatedMacVlan(hport) as ns: + try: + ns.runsh(""" + set -ex + ip link set iface up + ip link add dev vlan10 link iface up type vlan id 10 + ip addr add 10.0.0.1/24 dev vlan10 + """) - ip link set iface up - ip link add dev vlan10 link iface up type vlan id 10 - ip addr add 10.0.0.1/24 dev vlan10 - """) - if(should_pass): + if should_pass: ns.must_reach("10.0.0.2") else: ns.must_not_reach("10.0.0.2") + except Exception as e: + print(f"An error occurred during the VLAN setup or ping test: {e}") + raise + with infamy.Test() as test: with test.step("Set up topology and attach to target DUT"): env = infamy.Env() @@ -60,7 +65,7 @@ def test_ping(hport, should_pass): }) with test.step("Waiting for links to come up"): - until(lambda: iface.get_oper_up(target, tport)) + until(lambda: iface.get_param(target, tport, "oper-status") == "up") with test.step("Ping 10.0.0.2 from VLAN 10 on host:data with IP 10.0.0.1"): _, hport = env.ltop.xlate("host", "data") diff --git a/test/infamy/iface.py b/test/infamy/iface.py index 5406f2623..6bfdb5559 100644 --- a/test/infamy/iface.py +++ b/test/infamy/iface.py @@ -2,14 +2,14 @@ Fetch interface status from remote device. """ -def get_iface_xpath(iface, path=None): +def get_xpath(iface, path=None): """Compose complete XPath to a YANG node in /ietf-interfaces""" xpath=f"/ietf-interfaces:interfaces/interface[name='{iface}']" if not path is None: xpath=f"{xpath}/{path}" return xpath -def _iface_extract_param(json_content, param): +def _extract_param(json_content, param): """Returns (extracted) value for parameter 'param'""" interfaces = json_content.get('interfaces') if not interfaces: @@ -22,16 +22,16 @@ def _iface_extract_param(json_content, param): return None -def _iface_get_param(target, iface, param=None): +def get_param(target, iface, param=None): """Fetch target dict for iface and extract param from JSON""" - content = target.get_data(get_iface_xpath(iface, param)) + content = target.get_data(get_xpath(iface, param)) if content is None: return None - return _iface_extract_param(content, param) + return _extract_param(content, param) -def interface_exist(target, iface): +def exist(target, iface): """Verify that the target interface exists""" - return _iface_get_param(target, iface, "name") is not None + return get_param(target, iface, "name") is not None def address_exist(target, iface, address, prefix_length = 24, proto="dhcp"): """Check if 'address' is set on iface""" @@ -56,47 +56,9 @@ def get_ipv4_address(target, iface): return None return ipv4['address'] -def get_if_index(target, iface): - """Fetch interface 'if-index' (operational status)""" - return _iface_get_param(target, iface, "if-index") - -def get_oper_status(target, iface): - """Fetch interface 'oper-status' (operational status)""" - return _iface_get_param(target, iface, "oper-status") - def get_phys_address(target, iface): """Fetch interface MAC address (operational status)""" - return _iface_get_param(target, iface, "phys-address") - -def get_oper_up(target,iface): - state=get_oper_status(target,iface) - return state == "up" - -def print_iface(target, iface): - data = target.get_data(_iface_xpath(iface, None)) - print(data) - -def print_all(target): - """Print status parameters for all target interfaces""" - try: - content = target.get_dict("/ietf-interfaces:interfaces") - interfaces = content.get('interfaces') - if interfaces: - interface_list = interfaces.get('interface') - if interface_list and isinstance(interface_list, list): - col1 = "name" - col2 = "if-index" - col3 = "oper-status" - print('-'*36) - print(f"{col1: <12}{col2: <12}{col3: <12}") - print('-'*36) - for interface in interface_list: - print(f"{interface['name']: <12}" - f"{interface['if-index']: <12}" - f"{interface['oper-status']: <12}") - print('-'*36) - except: - print(f"Failed to get interfaces' status from target {target}") + return get_param(target, iface, "phys-address") def exist_bridge_multicast_filter(target, group, iface, bridge): # The interface array is different in restconf/netconf, netconf has a keyed list but diff --git a/test/infamy/netconf.py b/test/infamy/netconf.py index 86f44c129..5311a2b62 100644 --- a/test/infamy/netconf.py +++ b/test/infamy/netconf.py @@ -374,7 +374,7 @@ def get_schema(self, schema, outdir): def get_iface(self, name): """Fetch target dict for iface and extract param from JSON""" - content = self.get_data(iface.get_iface_xpath(name)) + content = self.get_data(iface.get_xpath(name)) interface = content.get("interfaces", {}).get("interface", None) if interface is None: