diff --git a/test/case/ietf_hardware/usb.py b/test/case/ietf_hardware/usb.py index 54cee7c54..65e4812ad 100755 --- a/test/case/ietf_hardware/usb.py +++ b/test/case/ietf_hardware/usb.py @@ -16,13 +16,14 @@ if len(available) < 1: test.skip() - with test.step("Lock USB ports"): + with test.step("Unlock USB ports"): components=[] for port in available: component = { "name": port, + "class": "infix-hardware:usb", "state": { - "admin-state": "locked" + "admin-state": "unlocked" } } components.append(component) @@ -33,18 +34,18 @@ } }) - with test.step("Verify USB ports locked"): + with test.step("Verify USB ports unlocked"): for port in available: - until(lambda: usb.get_usb_state(target, port) == "locked") - + until(lambda: usb.get_usb_state(target, port) == "unlocked") - with test.step("Unlock USB ports"): - components=[] - for port in available: + if len(available) > 1: + with test.step("Lock one port"): + components=[] component = { - "name": port, + "name": available[1], + "class": "infix-hardware:usb", "state": { - "admin-state": "unlocked" + "admin-state": "locked" } } components.append(component) @@ -54,18 +55,16 @@ "component": components } }) + with test.step("Verify one port is locked and one unlocked"): + until(lambda: usb.get_usb_state(target, available[1]) == "locked") + until(lambda: usb.get_usb_state(target, available[0]) == "unlocked") - - with test.step("Verify USB ports unlocked"): + with test.step("Lock USB ports"): + components=[] for port in available: - until(lambda: usb.get_usb_state(target, port) == "unlocked") - - if len(available) > 1: - with test.step("Lock one port"): - components=[] component = { - "name": available[1], "class": "infix-hardware:usb", + "name": port, "state": { "admin-state": "locked" } @@ -77,10 +76,10 @@ "component": components } }) - with test.step("Verify one port is locked and one unlocked"): - until(lambda: usb.get_usb_state(target, available[1]) == "locked") - until(lambda: usb.get_usb_state(target, available[0]) == "unlocked") + with test.step("Verify USB ports locked"): + for port in available: + until(lambda: usb.get_usb_state(target, port) == "locked") with test.step("Remove all hardware configuration"): for port in available: target.delete_xpath(f"/ietf-hardware:hardware/component[name='{port}']") @@ -112,16 +111,15 @@ for port in available: until(lambda: usb.get_usb_state(target, port) == "unlocked") - with test.step("Remove USB configuration, and reboot"): - for port in available: - target.delete_xpath(f"/ietf-hardware:hardware/component[name='{port}']") + with test.step("Save to startup and reboot"): + target.startup_override() target.copy("running", "startup") target.reboot() if wait_boot(target) == False: test.fail() - target = env.attach("target", "mgmt", factory_default = False) + target = env.attach("target", "mgmt", test_reset=False) - with test.step("Verify that all ports are locked"): + with test.step("Verify that all ports are unlocked"): for port in available: - until(lambda: usb.get_usb_state(target, port) == "locked") + until(lambda: usb.get_usb_state(target, port) == "unlocked") test.succeed() diff --git a/test/case/misc/start_from_startup.py b/test/case/misc/start_from_startup.py index 42c8c7ac0..839f9a357 100755 --- a/test/case/misc/start_from_startup.py +++ b/test/case/misc/start_from_startup.py @@ -18,11 +18,12 @@ target.delete_xpath("/ietf-hardware:hardware/component") target.copy("running", "startup") with test.step("Reboot and wait for the unit to come back"): + target.startup_override() target.copy("running", "startup") target.reboot() if wait_boot(target) == False: test.fail() - target = env.attach("target", "mgmt", factory_default = False) + target = env.attach("target", "mgmt", test_default = False) with test.step("Verify hostname"): data = target.get_dict("/ietf-system:system/hostname") diff --git a/test/infamy/env.py b/test/infamy/env.py index d3e8b9dd2..69ad60a2f 100644 --- a/test/infamy/env.py +++ b/test/infamy/env.py @@ -63,7 +63,7 @@ def attr(self, name, default=None): def get_password(self, node): return self.ptop.get_password(node) - def attach(self, node, port, protocol=None, factory_default=True): + def attach(self, node, port, protocol=None, test_reset=True): if self.ltop: mapping = self.ltop.mapping[node] node, port = self.ltop.xlate(node, port) @@ -88,20 +88,24 @@ def attach(self, node, port, protocol=None, factory_default=True): password = self.get_password(node) if protocol == "netconf": - return netconf.Device( + dev = netconf.Device( location=netconf.Location(cport, mgmtip, password), mapping=mapping, - yangdir=self.args.yangdir, - factory_default=factory_default - ) + yangdir=self.args.yangdir) + if test_reset: + dev.test_reset() + return dev + if protocol == "ssh": return ssh.Device(ssh.Location(mgmtip, password)) if protocol == "restconf": - return restconf.Device(location=restconf.Location(cport, + dev = restconf.Device(location=restconf.Location(cport, mgmtip, password), mapping=mapping, - yangdir=self.args.yangdir, - factory_default=factory_default) + yangdir=self.args.yangdir) + if test_reset: + dev.test_reset() + return dev raise Exception(f"Unsupported management procotol \"{protocol}\"") diff --git a/test/infamy/netconf.py b/test/infamy/netconf.py index 3b75e4695..fe5f6b3b8 100644 --- a/test/infamy/netconf.py +++ b/test/infamy/netconf.py @@ -96,8 +96,7 @@ class Device(Transport): def __init__(self, location: Location, mapping: dict, - yangdir: None | str = None, - factory_default=True): + yangdir: None | str = None): print("Testing using NETCONF") self.location = location @@ -114,12 +113,6 @@ def __init__(self, del self.ly self.ly = libyang.Context(yangdir) self._ly_init(yangdir) - if factory_default: - try: - self.ncc.dispatch('') - except RpcError as err: - print(f"Failed sending factory-default RPC: {err}") - raise err def _ncc_init(self, location): ai = socket.getaddrinfo(location.host, location.port, diff --git a/test/infamy/restconf.py b/test/infamy/restconf.py index 162c5a465..4e35af383 100644 --- a/test/infamy/restconf.py +++ b/test/infamy/restconf.py @@ -102,8 +102,7 @@ class Device(Transport): def __init__(self, location: Location, mapping: dict, - yangdir: None | str = None, - factory_default = True): + yangdir: None | str = None): print("Testing using RESTCONF") self.location=location self.url_base=f"https://[{location.host}]:{location.port}" @@ -121,9 +120,6 @@ def __init__(self, self._ly_bootstrap(yangdir) self._ly_init(yangdir) - if factory_default: - self.factory_default() - def get_schemas_list(self): data=self.get_operational("/ietf-yang-library:modules-state").print_dict() return data["modules-state"]["module"] @@ -236,11 +232,23 @@ def get_config_dict(self, modname): def put_config_dict(self, modname, edit): """Add @edit to running config and put the whole configuration""" + # This is hacky, refactor when rousette have PATCH support. running=self.get_running() mod=self.lyctx.get_module(modname) + + for k, v in edit.items(): + module=modname+":"+k + data=v + break + + # Ugly hack, but this function should be refactored when patch is avaible in rousette anyway. + rundict = json.loads(running.print_mem("json", with_siblings=True, pretty=False)) + if rundict.get(module) is None: + rundict[module] = {} + running=self.lyctx.parse_data_mem(json.dumps(rundict), "json", parse_only=True) + change=mod.parse_data_dict(edit, no_state=True, validate=False) running.merge_module(change) - return self.put_datastore("running", json.loads(running.print_mem("json", with_siblings=True, pretty=False))) def call_rpc(self, rpc): @@ -285,10 +293,6 @@ def copy(self, source, target): def reboot(self): self.call_rpc("ietf-system:system-restart") - def factory_default(self): - """Factory reset target""" - return self.call_rpc("infix-factory-default:factory-default") - def call_action(self, xpath): path=xpath_to_uri(xpath) url=f"{self.restconf_url}/data{path}" diff --git a/test/infamy/transport.py b/test/infamy/transport.py index 924cebe91..5a6ce7ff9 100644 --- a/test/infamy/transport.py +++ b/test/infamy/transport.py @@ -48,3 +48,9 @@ def reachable(self): """Check if the device reachable on ll6""" neigh = ll6ping(self.location.interface, flags=["-w1", "-c1", "-L", "-n"]) return bool(neigh) + + def test_reset(self): + self.call_action("/infix-test:test/reset") + + def startup_override(self): + self.call_action("/infix-test:test/override-startup")