Skip to content

Commit

Permalink
test: refactor iface.py
Browse files Browse the repository at this point in the history
  • Loading branch information
axkar committed Oct 16, 2024
1 parent eb79adc commit 4d4a4c6
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 66 deletions.
2 changes: 1 addition & 1 deletion test/case/ietf_interfaces/iface_phys_address/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def calc_mac(base_mac, mac_offset):
def reset_mac(tgt, port, mac):
"""Reset DUT interface MAC address to default."""
node = "infix-interfaces:custom-phys-address"
xpath = iface.get_iface_xpath(port, node)
xpath = iface.get_xpath(port, node)
tgt.delete_xpath(xpath)
with test.step("Verify target:data MAC address is reset to default"):
until(lambda: iface.get_phys_address(tgt, tport) == mac)
Expand Down
4 changes: 2 additions & 2 deletions test/case/ietf_interfaces/verify_all_interface_types/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@


def verify_interface(target, interface, expected_type):
assert iface.interface_exist(target, interface), f"Interface <{interface}> does not exist."
assert iface.exist(target, interface), f"Interface <{interface}> does not exist."

expected_type = f"infix-if-type:{expected_type}"
actual_type = iface._iface_get_param(target, interface, "type")
actual_type = iface.get_param(target, interface, "type")

if expected_type == "infix-if-type:etherlike" and actual_type == "infix-if-type:ethernet":
return # Allow 'etherlike' to match 'ethernet'
Expand Down
12 changes: 6 additions & 6 deletions test/case/ietf_interfaces/veth_delete/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
})

with test.step("Verify interfaces 'veth0a' and 'veth0b' exists"):
assert iface.interface_exist(target, veth0a), \
assert iface.exist(target, veth0a), \
f"Interface <{veth0a}> does not exist."
assert iface.interface_exist(target, veth0b), \
assert iface.exist(target, veth0b), \
f"Interface <{veth0b}> does not exist."

with test.step("Set IP address on target:eth0 (dummy op)"):
Expand Down Expand Up @@ -97,15 +97,15 @@
target = env.attach("target", "mgmt")

with test.step("Verify target:eth0 and target:eth1 still exist"):
assert iface.interface_exist(target, eth0), \
assert iface.exist(target, eth0), \
f"Interface {eth0} missing!"
assert iface.interface_exist(target, eth1), \
assert iface.exist(target, eth1), \
f"Interface {eth1} missing!"

with test.step("Verify VETH pair have been removed"):
assert not iface.interface_exist(target, veth0a), \
assert not iface.exist(target, veth0a), \
f"Interface <{veth0a}> still exists!"
assert not iface.interface_exist(target, veth0b), \
assert not iface.exist(target, veth0b), \
f"Interface <{veth0b}> still exists!"

test.succeed()
25 changes: 15 additions & 10 deletions test/case/ietf_interfaces/vlan_ping/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,31 @@
Very basic test if the VLAN interface configuration works.
"""

import infamy
import infamy.iface as iface
import copy

from infamy import until

def test_ping(hport, should_pass):
with infamy.IsolatedMacVlan(hport) as ns:
pingtest = ns.runsh("""
set -ex
with infamy.IsolatedMacVlan(hport) as ns:
try:
ns.runsh("""
set -ex
ip link set iface up
ip link add dev vlan10 link iface up type vlan id 10
ip addr add 10.0.0.1/24 dev vlan10
""")

ip link set iface up
ip link add dev vlan10 link iface up type vlan id 10
ip addr add 10.0.0.1/24 dev vlan10
""")
if(should_pass):
if should_pass:
ns.must_reach("10.0.0.2")
else:
ns.must_not_reach("10.0.0.2")

except Exception as e:
print(f"An error occurred during the VLAN setup or ping test: {e}")
raise

with infamy.Test() as test:
with test.step("Set up topology and attach to target DUT"):
env = infamy.Env()
Expand Down Expand Up @@ -60,7 +65,7 @@ def test_ping(hport, should_pass):
})

with test.step("Waiting for links to come up"):
until(lambda: iface.get_oper_up(target, tport))
until(lambda: iface.get_param(target, tport, "oper-status") == "up")

with test.step("Ping 10.0.0.2 from VLAN 10 on host:data with IP 10.0.0.1"):
_, hport = env.ltop.xlate("host", "data")
Expand Down
54 changes: 8 additions & 46 deletions test/infamy/iface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
Fetch interface status from remote device.
"""

def get_iface_xpath(iface, path=None):
def get_xpath(iface, path=None):
"""Compose complete XPath to a YANG node in /ietf-interfaces"""
xpath=f"/ietf-interfaces:interfaces/interface[name='{iface}']"
if not path is None:
xpath=f"{xpath}/{path}"
return xpath

def _iface_extract_param(json_content, param):
def _extract_param(json_content, param):
"""Returns (extracted) value for parameter 'param'"""
interfaces = json_content.get('interfaces')
if not interfaces:
Expand All @@ -22,16 +22,16 @@ def _iface_extract_param(json_content, param):

return None

def _iface_get_param(target, iface, param=None):
def get_param(target, iface, param=None):
"""Fetch target dict for iface and extract param from JSON"""
content = target.get_data(get_iface_xpath(iface, param))
content = target.get_data(get_xpath(iface, param))
if content is None:
return None
return _iface_extract_param(content, param)
return _extract_param(content, param)

def interface_exist(target, iface):
def exist(target, iface):
"""Verify that the target interface exists"""
return _iface_get_param(target, iface, "name") is not None
return get_param(target, iface, "name") is not None

def address_exist(target, iface, address, prefix_length = 24, proto="dhcp"):
"""Check if 'address' is set on iface"""
Expand All @@ -56,47 +56,9 @@ def get_ipv4_address(target, iface):
return None
return ipv4['address']

def get_if_index(target, iface):
"""Fetch interface 'if-index' (operational status)"""
return _iface_get_param(target, iface, "if-index")

def get_oper_status(target, iface):
"""Fetch interface 'oper-status' (operational status)"""
return _iface_get_param(target, iface, "oper-status")

def get_phys_address(target, iface):
"""Fetch interface MAC address (operational status)"""
return _iface_get_param(target, iface, "phys-address")

def get_oper_up(target,iface):
state=get_oper_status(target,iface)
return state == "up"

def print_iface(target, iface):
data = target.get_data(_iface_xpath(iface, None))
print(data)

def print_all(target):
"""Print status parameters for all target interfaces"""
try:
content = target.get_dict("/ietf-interfaces:interfaces")
interfaces = content.get('interfaces')
if interfaces:
interface_list = interfaces.get('interface')
if interface_list and isinstance(interface_list, list):
col1 = "name"
col2 = "if-index"
col3 = "oper-status"
print('-'*36)
print(f"{col1: <12}{col2: <12}{col3: <12}")
print('-'*36)
for interface in interface_list:
print(f"{interface['name']: <12}"
f"{interface['if-index']: <12}"
f"{interface['oper-status']: <12}")
print('-'*36)
except:
print(f"Failed to get interfaces' status from target {target}")
return get_param(target, iface, "phys-address")

def exist_bridge_multicast_filter(target, group, iface, bridge):
# The interface array is different in restconf/netconf, netconf has a keyed list but
Expand Down
2 changes: 1 addition & 1 deletion test/infamy/netconf.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def get_schema(self, schema, outdir):

def get_iface(self, name):
"""Fetch target dict for iface and extract param from JSON"""
content = self.get_data(iface.get_iface_xpath(name))
content = self.get_data(iface.get_xpath(name))
interface = content.get("interfaces", {}).get("interface", None)

if interface is None:
Expand Down

0 comments on commit 4d4a4c6

Please sign in to comment.