From af5831065963e14a53f6b2a4ead4a7e3014220e6 Mon Sep 17 00:00:00 2001 From: David Lobato Date: Wed, 14 Aug 2024 16:59:41 +0100 Subject: [PATCH 1/7] feat(anta.tests): Optimize VerifyRoutingTableEntry by quering all routes for a vrf. (#682) --- anta/tests/routing/generic.py | 35 +++++-- .../units/anta_tests/routing/test_generic.py | 91 +++++++++++++++++++ 2 files changed, 118 insertions(+), 8 deletions(-) diff --git a/anta/tests/routing/generic.py b/anta/tests/routing/generic.py index 89d4bc56f..cd9cf0d24 100644 --- a/anta/tests/routing/generic.py +++ b/anta/tests/routing/generic.py @@ -7,7 +7,8 @@ # mypy: disable-error-code=attr-defined from __future__ import annotations -from ipaddress import IPv4Address, ip_interface +from functools import cache +from ipaddress import IPv4Address, IPv4Interface from typing import ClassVar, Literal from pydantic import model_validator @@ -131,7 +132,10 @@ class VerifyRoutingTableEntry(AntaTest): name = "VerifyRoutingTableEntry" description = "Verifies that the provided routes are present in the routing table of a specified VRF." categories: ClassVar[list[str]] = ["routing"] - commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaTemplate(template="show ip route vrf {vrf} {route}", revision=4)] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [ + AntaTemplate(template="show ip route vrf {vrf} {route}", revision=4), + AntaTemplate(template="show ip route vrf {vrf}", revision=4), + ] class Input(AntaTest.Input): """Input model for the VerifyRoutingTableEntry test.""" @@ -140,20 +144,35 @@ class Input(AntaTest.Input): """VRF context. Defaults to `default` VRF.""" routes: list[IPv4Address] """List of routes to verify.""" + collect: Literal["one", "all"] = "one" + """Route collect behavior: one=one route per command, all=all routes in vrf per command. Defaults to `one`""" def render(self, template: AntaTemplate) -> list[AntaCommand]: - """Render the template for each route in the input list.""" - return [template.render(vrf=self.inputs.vrf, route=route) for route in self.inputs.routes] + """Render the template for the input vrf.""" + if template == VerifyRoutingTableEntry.commands[0] and self.inputs.collect == "one": + return [template.render(vrf=self.inputs.vrf, route=route) for route in self.inputs.routes] + + if template == VerifyRoutingTableEntry.commands[1] and self.inputs.collect == "all": + return [template.render(vrf=self.inputs.vrf)] + + return [] + + @staticmethod + @cache + def ip_interface_ip(route: str) -> IPv4Address: + """Return the IP address of the provided ip route with mask.""" + return IPv4Interface(route).ip @AntaTest.anta_test def test(self) -> None: """Main test function for VerifyRoutingTableEntry.""" - missing_routes = [] + commands_output_route_ips = set() for command in self.instance_commands: - vrf, route = command.params.vrf, command.params.route - if len(routes := command.json_output["vrfs"][vrf]["routes"]) == 0 or route != ip_interface(next(iter(routes))).ip: - missing_routes.append(str(route)) + command_output_vrf = command.json_output["vrfs"][self.inputs.vrf] + commands_output_route_ips |= {self.ip_interface_ip(route) for route in command_output_vrf["routes"]} + + missing_routes = [str(route) for route in self.inputs.routes if route not in commands_output_route_ips] if not missing_routes: self.result.is_success() diff --git a/tests/units/anta_tests/routing/test_generic.py b/tests/units/anta_tests/routing/test_generic.py index 36658f5b2..621cf22ad 100644 --- a/tests/units/anta_tests/routing/test_generic.py +++ b/tests/units/anta_tests/routing/test_generic.py @@ -130,6 +130,48 @@ "inputs": {"vrf": "default", "routes": ["10.1.0.1", "10.1.0.2"]}, "expected": {"result": "success"}, }, + { + "name": "success-collect-all", + "test": VerifyRoutingTableEntry, + "eos_data": [ + { + "vrfs": { + "default": { + "routingDisabled": False, + "allRoutesProgrammedHardware": True, + "allRoutesProgrammedKernel": True, + "defaultRouteState": "notSet", + "routes": { + "10.1.0.1/32": { + "hardwareProgrammed": True, + "routeType": "eBGP", + "routeLeaked": False, + "kernelProgrammed": True, + "routeAction": "forward", + "directlyConnected": False, + "preference": 20, + "metric": 0, + "vias": [{"nexthopAddr": "10.1.255.4", "interface": "Ethernet1"}], + }, + "10.1.0.2/32": { + "hardwareProgrammed": True, + "routeType": "eBGP", + "routeLeaked": False, + "kernelProgrammed": True, + "routeAction": "forward", + "directlyConnected": False, + "preference": 20, + "metric": 0, + "vias": [{"nexthopAddr": "10.1.255.6", "interface": "Ethernet2"}], + }, + }, + }, + }, + }, + ], + "inputs": {"vrf": "default", "routes": ["10.1.0.1", "10.1.0.2"], "collect": "all"}, + "expected": {"result": "success"}, + }, { "name": "failure-missing-route", "test": VerifyRoutingTableEntry, @@ -226,4 +268,53 @@ "inputs": {"vrf": "default", "routes": ["10.1.0.1", "10.1.0.2"]}, "expected": {"result": "failure", "messages": ["The following route(s) are missing from the routing table of VRF default: ['10.1.0.2']"]}, }, + { + "name": "failure-wrong-route-collect-all", + "test": VerifyRoutingTableEntry, + "eos_data": [ + { + "vrfs": { + "default": { + "routingDisabled": False, + "allRoutesProgrammedHardware": True, + "allRoutesProgrammedKernel": True, + "defaultRouteState": "notSet", + "routes": { + "10.1.0.1/32": { + "hardwareProgrammed": True, + "routeType": "eBGP", + "routeLeaked": False, + "kernelProgrammed": True, + "routeAction": "forward", + "directlyConnected": False, + "preference": 20, + "metric": 0, + "vias": [{"nexthopAddr": "10.1.255.4", "interface": "Ethernet1"}], + }, + "10.1.0.55/32": { + "hardwareProgrammed": True, + "routeType": "eBGP", + "routeLeaked": False, + "kernelProgrammed": True, + "routeAction": "forward", + "directlyConnected": False, + "preference": 20, + "metric": 0, + "vias": [{"nexthopAddr": "10.1.255.6", "interface": "Ethernet2"}], + }, + }, + }, + }, + }, + ], + "inputs": {"vrf": "default", "routes": ["10.1.0.1", "10.1.0.2"], "collect": "all"}, + "expected": {"result": "failure", "messages": ["The following route(s) are missing from the routing table of VRF default: ['10.1.0.2']"]}, + }, + { + "name": "collect-input-error", + "test": VerifyRoutingTableEntry, + "eos_data": {}, + "inputs": {"vrf": "default", "routes": ["10.1.0.1", "10.1.0.2"], "collect": "not-valid"}, + "expected": {"result": "error", "messages": ["Inputs are not valid"]}, + }, ] From 2258078a282087b29efccca1b06f8cb608cc1943 Mon Sep 17 00:00:00 2001 From: vitthalmagadum <122079046+vitthalmagadum@users.noreply.github.com> Date: Thu, 15 Aug 2024 03:32:02 +0530 Subject: [PATCH 2/7] feat(anta): Added the test case to verify Update error counters for BGP neighbors (#775) --- anta/custom_types.py | 1 + anta/tests/routing/bgp.py | 93 +++++- examples/tests.yaml | 7 + tests/units/anta_tests/routing/test_bgp.py | 319 +++++++++++++++++++++ 4 files changed, 419 insertions(+), 1 deletion(-) diff --git a/anta/custom_types.py b/anta/custom_types.py index a0a0631d0..8a9070579 100644 --- a/anta/custom_types.py +++ b/anta/custom_types.py @@ -167,3 +167,4 @@ def validate_regex(value: str) -> str: Hostname = Annotated[str, Field(pattern=REGEXP_TYPE_HOSTNAME)] Port = Annotated[int, Field(ge=1, le=65535)] RegexString = Annotated[str, AfterValidator(validate_regex)] +BgpUpdateError = Literal["inUpdErrWithdraw", "inUpdErrIgnore", "inUpdErrDisableAfiSafi", "disabledAfiSafi", "lastUpdErrTime"] diff --git a/anta/tests/routing/bgp.py b/anta/tests/routing/bgp.py index 7bd39ddcc..68225a6c9 100644 --- a/anta/tests/routing/bgp.py +++ b/anta/tests/routing/bgp.py @@ -14,7 +14,7 @@ from pydantic.v1.utils import deep_update from pydantic_extra_types.mac_address import MacAddress -from anta.custom_types import Afi, MultiProtocolCaps, Safi, Vni +from anta.custom_types import Afi, BgpUpdateError, MultiProtocolCaps, Safi, Vni from anta.models import AntaCommand, AntaTemplate, AntaTest from anta.tools import get_item, get_value @@ -1226,3 +1226,94 @@ def test(self) -> None: self.result.is_success() else: self.result.is_failure(f"Following BGP peers are not configured or hold and keep-alive timers are not correct:\n{failures}") + + +class VerifyBGPPeerUpdateErrors(AntaTest): + """Verifies BGP update error counters for the provided BGP IPv4 peer(s). + + By default, all update error counters will be checked for any non-zero values. + An optional list of specific update error counters can be provided for granular testing. + + Note: For "disabledAfiSafi" error counter field, checking that it's not "None" versus 0. + + Expected Results + ---------------- + * Success: The test will pass if the BGP peer's update error counter(s) are zero/None. + * Failure: The test will fail if the BGP peer's update error counter(s) are non-zero/not None/Not Found or + peer is not configured. + + Examples + -------- + ```yaml + anta.tests.routing: + bgp: + - VerifyBGPPeerUpdateErrors: + bgp_peers: + - peer_address: 172.30.11.1 + vrf: default + update_error_filter: + - inUpdErrWithdraw + ``` + """ + + name = "VerifyBGPPeerUpdateErrors" + description = "Verifies the update error counters of a BGP IPv4 peer." + categories: ClassVar[list[str]] = ["bgp"] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaTemplate(template="show bgp neighbors {peer} vrf {vrf}", revision=3)] + + class Input(AntaTest.Input): + """Input model for the VerifyBGPPeerUpdateErrors test.""" + + bgp_peers: list[BgpPeer] + """List of BGP peers""" + + class BgpPeer(BaseModel): + """Model for a BGP peer.""" + + peer_address: IPv4Address + """IPv4 address of a BGP peer.""" + vrf: str = "default" + """Optional VRF for BGP peer. If not provided, it defaults to `default`.""" + update_errors: list[BgpUpdateError] | None = None + """Optional list of update error counters to be verified. If not provided, test will verifies all the update error counters.""" + + def render(self, template: AntaTemplate) -> list[AntaCommand]: + """Render the template for each BGP peer in the input list.""" + return [template.render(peer=str(bgp_peer.peer_address), vrf=bgp_peer.vrf) for bgp_peer in self.inputs.bgp_peers] + + @AntaTest.anta_test + def test(self) -> None: + """Main test function for VerifyBGPPeerUpdateErrors.""" + failures: dict[Any, Any] = {} + + for command, input_entry in zip(self.instance_commands, self.inputs.bgp_peers): + peer = command.params.peer + vrf = command.params.vrf + update_error_counters = input_entry.update_errors + + # Verify BGP peer. + if not (peer_list := get_value(command.json_output, f"vrfs.{vrf}.peerList")) or (peer_detail := get_item(peer_list, "peerAddress", peer)) is None: + failures[peer] = {vrf: "Not configured"} + continue + + # Getting the BGP peer's error counters output. + error_counters_output = peer_detail.get("peerInUpdateErrors", {}) + + # In case update error counters not provided, It will check all the update error counters. + if not update_error_counters: + update_error_counters = error_counters_output + + # verifying the error counters. + error_counters_not_ok = { + ("disabledAfiSafi" if error_counter == "disabledAfiSafi" else error_counter): value + for error_counter in update_error_counters + if (value := error_counters_output.get(error_counter, "Not Found")) != "None" and value != 0 + } + if error_counters_not_ok: + failures[peer] = {vrf: error_counters_not_ok} + + # Check if any failures + if not failures: + self.result.is_success() + else: + self.result.is_failure(f"The following BGP peers are not configured or have non-zero update error counters:\n{failures}") diff --git a/examples/tests.yaml b/examples/tests.yaml index c479c8739..4386d08a9 100644 --- a/examples/tests.yaml +++ b/examples/tests.yaml @@ -567,6 +567,13 @@ anta.tests.routing: vrf: default hold_time: 180 keep_alive_time: 60 + - VerifyBGPPeerUpdateErrors: + bgp_peers: + - peer_address: 10.100.0.8 + vrf: default + update_errors: + - inUpdErrWithdraw + - inUpdErrIgnore ospf: - VerifyOSPFNeighborState: - VerifyOSPFNeighborCount: diff --git a/tests/units/anta_tests/routing/test_bgp.py b/tests/units/anta_tests/routing/test_bgp.py index e712e12a8..34f83ff66 100644 --- a/tests/units/anta_tests/routing/test_bgp.py +++ b/tests/units/anta_tests/routing/test_bgp.py @@ -19,6 +19,7 @@ VerifyBGPPeerMPCaps, VerifyBGPPeerRouteRefreshCap, VerifyBGPPeersHealth, + VerifyBGPPeerUpdateErrors, VerifyBGPSpecificPeers, VerifyBGPTimers, VerifyEVPNType2Route, @@ -3722,4 +3723,322 @@ ], }, }, + { + "name": "success", + "test": VerifyBGPPeerUpdateErrors, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "peerInUpdateErrors": { + "inUpdErrWithdraw": 0, + "inUpdErrIgnore": 0, + "inUpdErrDisableAfiSafi": 0, + "disabledAfiSafi": "None", + "lastUpdErrTime": 0, + }, + } + ] + }, + }, + }, + { + "vrfs": { + "MGMT": { + "peerList": [ + { + "peerAddress": "10.100.0.9", + "peerInUpdateErrors": { + "inUpdErrWithdraw": 0, + "inUpdErrIgnore": 0, + "inUpdErrDisableAfiSafi": 0, + "disabledAfiSafi": "None", + "lastUpdErrTime": 0, + }, + } + ] + }, + }, + }, + ], + "inputs": { + "bgp_peers": [ + {"peer_address": "10.100.0.8", "vrf": "default", "update_errors": ["inUpdErrWithdraw", "inUpdErrIgnore", "disabledAfiSafi"]}, + {"peer_address": "10.100.0.9", "vrf": "MGMT", "update_errors": ["inUpdErrWithdraw", "inUpdErrIgnore", "disabledAfiSafi"]}, + ] + }, + "expected": {"result": "success"}, + }, + { + "name": "failure-not-found", + "test": VerifyBGPPeerUpdateErrors, + "eos_data": [ + {"vrfs": {}}, + {"vrfs": {}}, + ], + "inputs": { + "bgp_peers": [ + {"peer_address": "10.100.0.8", "vrf": "default", "update_errors": ["inUpdErrWithdraw", "inUpdErrIgnore", "disabledAfiSafi"]}, + {"peer_address": "10.100.0.9", "vrf": "MGMT", "update_errors": ["inUpdErrWithdraw", "inUpdErrIgnore", "disabledAfiSafi"]}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "The following BGP peers are not configured or have non-zero update error counters:\n" + "{'10.100.0.8': {'default': 'Not configured'}, '10.100.0.9': {'MGMT': 'Not configured'}}" + ], + }, + }, + { + "name": "failure-errors", + "test": VerifyBGPPeerUpdateErrors, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "peerInUpdateErrors": { + "inUpdErrWithdraw": 0, + "inUpdErrIgnore": 0, + "inUpdErrDisableAfiSafi": 0, + "disabledAfiSafi": "ipv4Unicast", + "lastUpdErrTime": 0, + }, + } + ] + }, + }, + }, + { + "vrfs": { + "MGMT": { + "peerList": [ + { + "peerAddress": "10.100.0.9", + "peerInUpdateErrors": { + "inUpdErrWithdraw": 1, + "inUpdErrIgnore": 0, + "inUpdErrDisableAfiSafi": 0, + "disabledAfiSafi": "None", + "lastUpdErrTime": 0, + }, + } + ] + }, + }, + }, + ], + "inputs": { + "bgp_peers": [ + {"peer_address": "10.100.0.8", "vrf": "default", "update_errors": ["inUpdErrWithdraw", "inUpdErrIgnore", "disabledAfiSafi"]}, + {"peer_address": "10.100.0.9", "vrf": "MGMT", "update_errors": ["inUpdErrWithdraw", "inUpdErrIgnore", "disabledAfiSafi"]}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "The following BGP peers are not configured or have non-zero update error counters:\n" + "{'10.100.0.8': {'default': {'disabledAfiSafi': 'ipv4Unicast'}}, " + "'10.100.0.9': {'MGMT': {'inUpdErrWithdraw': 1}}}" + ], + }, + }, + { + "name": "failure-not-found", + "test": VerifyBGPPeerUpdateErrors, + "eos_data": [ + { + "vrfs": {}, + }, + { + "vrfs": {}, + }, + ], + "inputs": { + "bgp_peers": [ + {"peer_address": "10.100.0.8", "vrf": "default", "update_errors": ["inUpdErrWithdraw", "inUpdErrIgnore", "disabledAfiSafi"]}, + {"peer_address": "10.100.0.9", "vrf": "MGMT", "update_errors": ["inUpdErrWithdraw", "inUpdErrIgnore", "disabledAfiSafi"]}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "The following BGP peers are not configured or have non-zero update error counters:\n" + "{'10.100.0.8': {'default': 'Not configured'}, '10.100.0.9': {'MGMT': 'Not configured'}}" + ], + }, + }, + { + "name": "success-all-error-counters", + "test": VerifyBGPPeerUpdateErrors, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "peerInUpdateErrors": { + "inUpdErrWithdraw": 0, + "inUpdErrIgnore": 0, + "inUpdErrDisableAfiSafi": 0, + "disabledAfiSafi": "None", + "lastUpdErrTime": 0, + }, + } + ] + }, + }, + }, + { + "vrfs": { + "MGMT": { + "peerList": [ + { + "peerAddress": "10.100.0.9", + "peerInUpdateErrors": { + "inUpdErrWithdraw": 0, + "inUpdErrIgnore": 0, + "inUpdErrDisableAfiSafi": 0, + "disabledAfiSafi": "None", + "lastUpdErrTime": 0, + }, + } + ] + }, + }, + }, + ], + "inputs": { + "bgp_peers": [ + {"peer_address": "10.100.0.8", "vrf": "default"}, + {"peer_address": "10.100.0.9", "vrf": "MGMT"}, + ] + }, + "expected": {"result": "success"}, + }, + { + "name": "failure-all-error-counters", + "test": VerifyBGPPeerUpdateErrors, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "peerInUpdateErrors": { + "inUpdErrWithdraw": 1, + "inUpdErrIgnore": 0, + "inUpdErrDisableAfiSafi": 0, + "disabledAfiSafi": "ipv4Unicast", + "lastUpdErrTime": 0, + }, + } + ] + }, + }, + }, + { + "vrfs": { + "MGMT": { + "peerList": [ + { + "peerAddress": "10.100.0.9", + "peerInUpdateErrors": { + "inUpdErrWithdraw": 1, + "inUpdErrIgnore": 0, + "inUpdErrDisableAfiSafi": 1, + "disabledAfiSafi": "None", + "lastUpdErrTime": 0, + }, + } + ] + }, + }, + }, + ], + "inputs": { + "bgp_peers": [ + {"peer_address": "10.100.0.8", "vrf": "default", "update_errors": ["inUpdErrWithdraw", "inUpdErrIgnore", "disabledAfiSafi"]}, + { + "peer_address": "10.100.0.9", + "vrf": "MGMT", + "update_errors": ["inUpdErrWithdraw", "inUpdErrIgnore", "disabledAfiSafi", "inUpdErrDisableAfiSafi"], + }, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "The following BGP peers are not configured or have non-zero update error counters:\n" + "{'10.100.0.8': {'default': {'inUpdErrWithdraw': 1, 'disabledAfiSafi': 'ipv4Unicast'}}, " + "'10.100.0.9': {'MGMT': {'inUpdErrWithdraw': 1, 'inUpdErrDisableAfiSafi': 1}}}" + ], + }, + }, + { + "name": "failure-all-not-found", + "test": VerifyBGPPeerUpdateErrors, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "peerInUpdateErrors": { + "inUpdErrIgnore": 0, + "inUpdErrDisableAfiSafi": 0, + "disabledAfiSafi": "ipv4Unicast", + "lastUpdErrTime": 0, + }, + } + ] + }, + }, + }, + { + "vrfs": { + "MGMT": { + "peerList": [ + { + "peerAddress": "10.100.0.9", + "peerInUpdateErrors": { + "inUpdErrWithdraw": 1, + "inUpdErrIgnore": 0, + "disabledAfiSafi": "None", + "lastUpdErrTime": 0, + }, + } + ] + }, + }, + }, + ], + "inputs": { + "bgp_peers": [ + {"peer_address": "10.100.0.8", "vrf": "default", "update_errors": ["inUpdErrWithdraw", "inUpdErrIgnore", "disabledAfiSafi"]}, + { + "peer_address": "10.100.0.9", + "vrf": "MGMT", + "update_errors": ["inUpdErrWithdraw", "inUpdErrIgnore", "disabledAfiSafi", "inUpdErrDisableAfiSafi"], + }, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "The following BGP peers are not configured or have non-zero update error counters:\n" + "{'10.100.0.8': {'default': {'inUpdErrWithdraw': 'Not Found', 'disabledAfiSafi': 'ipv4Unicast'}}, " + "'10.100.0.9': {'MGMT': {'inUpdErrWithdraw': 1, 'inUpdErrDisableAfiSafi': 'Not Found'}}}" + ], + }, + }, ] From c37c089e37bf90388a61d18475c2e089828be27c Mon Sep 17 00:00:00 2001 From: vitthalmagadum <122079046+vitthalmagadum@users.noreply.github.com> Date: Thu, 15 Aug 2024 06:46:53 +0530 Subject: [PATCH 3/7] feat(anta): Added the test case to verify Inbound/outbound stats for BGP neighbors (#778) --- anta/custom_types.py | 30 ++ anta/tests/routing/bgp.py | 89 +++++- examples/tests.yaml | 11 + tests/units/anta_tests/routing/test_bgp.py | 316 +++++++++++++++++++++ 4 files changed, 445 insertions(+), 1 deletion(-) diff --git a/anta/custom_types.py b/anta/custom_types.py index 8a9070579..56c213977 100644 --- a/anta/custom_types.py +++ b/anta/custom_types.py @@ -167,4 +167,34 @@ def validate_regex(value: str) -> str: Hostname = Annotated[str, Field(pattern=REGEXP_TYPE_HOSTNAME)] Port = Annotated[int, Field(ge=1, le=65535)] RegexString = Annotated[str, AfterValidator(validate_regex)] +BgpDropStats = Literal[ + "inDropAsloop", + "inDropClusterIdLoop", + "inDropMalformedMpbgp", + "inDropOrigId", + "inDropNhLocal", + "inDropNhAfV6", + "prefixDroppedMartianV4", + "prefixDroppedMaxRouteLimitViolatedV4", + "prefixDroppedMartianV6", + "prefixDroppedMaxRouteLimitViolatedV6", + "prefixLuDroppedV4", + "prefixLuDroppedMartianV4", + "prefixLuDroppedMaxRouteLimitViolatedV4", + "prefixLuDroppedV6", + "prefixLuDroppedMartianV6", + "prefixLuDroppedMaxRouteLimitViolatedV6", + "prefixEvpnDroppedUnsupportedRouteType", + "prefixBgpLsDroppedReceptionUnsupported", + "outDropV4LocalAddr", + "outDropV6LocalAddr", + "prefixVpnIpv4DroppedImportMatchFailure", + "prefixVpnIpv4DroppedMaxRouteLimitViolated", + "prefixVpnIpv6DroppedImportMatchFailure", + "prefixVpnIpv6DroppedMaxRouteLimitViolated", + "prefixEvpnDroppedImportMatchFailure", + "prefixEvpnDroppedMaxRouteLimitViolated", + "prefixRtMembershipDroppedLocalAsReject", + "prefixRtMembershipDroppedMaxRouteLimitViolated", +] BgpUpdateError = Literal["inUpdErrWithdraw", "inUpdErrIgnore", "inUpdErrDisableAfiSafi", "disabledAfiSafi", "lastUpdErrTime"] diff --git a/anta/tests/routing/bgp.py b/anta/tests/routing/bgp.py index 68225a6c9..6a7002356 100644 --- a/anta/tests/routing/bgp.py +++ b/anta/tests/routing/bgp.py @@ -14,7 +14,7 @@ from pydantic.v1.utils import deep_update from pydantic_extra_types.mac_address import MacAddress -from anta.custom_types import Afi, BgpUpdateError, MultiProtocolCaps, Safi, Vni +from anta.custom_types import Afi, BgpDropStats, BgpUpdateError, MultiProtocolCaps, Safi, Vni from anta.models import AntaCommand, AntaTemplate, AntaTest from anta.tools import get_item, get_value @@ -1228,6 +1228,93 @@ def test(self) -> None: self.result.is_failure(f"Following BGP peers are not configured or hold and keep-alive timers are not correct:\n{failures}") +class VerifyBGPPeerDropStats(AntaTest): + """Verifies BGP NLRI drop statistics for the provided BGP IPv4 peer(s). + + By default, all drop statistics counters will be checked for any non-zero values. + An optional list of specific drop statistics can be provided for granular testing. + + Expected Results + ---------------- + * Success: The test will pass if the BGP peer's drop statistic(s) are zero. + * Failure: The test will fail if the BGP peer's drop statistic(s) are non-zero/Not Found or peer is not configured. + + Examples + -------- + ```yaml + anta.tests.routing: + bgp: + - VerifyBGPPeerDropStats: + bgp_peers: + - peer_address: 172.30.11.1 + vrf: default + drop_stats: + - inDropAsloop + - prefixEvpnDroppedUnsupportedRouteType + ``` + """ + + name = "VerifyBGPPeerDropStats" + description = "Verifies the NLRI drop statistics of a BGP IPv4 peer(s)." + categories: ClassVar[list[str]] = ["bgp"] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaTemplate(template="show bgp neighbors {peer} vrf {vrf}", revision=3)] + + class Input(AntaTest.Input): + """Input model for the VerifyBGPPeerDropStats test.""" + + bgp_peers: list[BgpPeer] + """List of BGP peers""" + + class BgpPeer(BaseModel): + """Model for a BGP peer.""" + + peer_address: IPv4Address + """IPv4 address of a BGP peer.""" + vrf: str = "default" + """Optional VRF for BGP peer. If not provided, it defaults to `default`.""" + drop_stats: list[BgpDropStats] | None = None + """Optional list of drop statistics to be verified. If not provided, test will verifies all the drop statistics.""" + + def render(self, template: AntaTemplate) -> list[AntaCommand]: + """Render the template for each BGP peer in the input list.""" + return [template.render(peer=str(bgp_peer.peer_address), vrf=bgp_peer.vrf) for bgp_peer in self.inputs.bgp_peers] + + @AntaTest.anta_test + def test(self) -> None: + """Main test function for VerifyBGPPeerDropStats.""" + failures: dict[Any, Any] = {} + + for command, input_entry in zip(self.instance_commands, self.inputs.bgp_peers): + peer = command.params.peer + vrf = command.params.vrf + drop_statistics = input_entry.drop_stats + + # Verify BGP peer + if not (peer_list := get_value(command.json_output, f"vrfs.{vrf}.peerList")) or (peer_detail := get_item(peer_list, "peerAddress", peer)) is None: + failures[peer] = {vrf: "Not configured"} + continue + + # Verify BGP peer's drop stats + drop_stats_output = peer_detail.get("dropStats", {}) + + # In case drop stats not provided, It will check all drop statistics + if not drop_statistics: + drop_statistics = drop_stats_output + + # Verify BGP peer's drop stats + drop_stats_not_ok = { + drop_stat: drop_stats_output.get(drop_stat, "Not Found") for drop_stat in drop_statistics if drop_stats_output.get(drop_stat, "Not Found") + } + if any(drop_stats_not_ok): + failures[peer] = {vrf: drop_stats_not_ok} + + # Check if any failures + if not failures: + self.result.is_success() + else: + self.result.is_failure(f"The following BGP peers are not configured or have non-zero NLRI drop statistics counters:\n{failures}") + + class VerifyBGPPeerUpdateErrors(AntaTest): """Verifies BGP update error counters for the provided BGP IPv4 peer(s). diff --git a/examples/tests.yaml b/examples/tests.yaml index 4386d08a9..c4248cf75 100644 --- a/examples/tests.yaml +++ b/examples/tests.yaml @@ -567,6 +567,17 @@ anta.tests.routing: vrf: default hold_time: 180 keep_alive_time: 60 + - VerifyBGPPeerDropStats: + bgp_peers: + - peer_address: 10.101.0.4 + vrf: default + drop_stats: + - inDropAsloop + - inDropClusterIdLoop + - inDropMalformedMpbgp + - inDropOrigId + - inDropNhLocal + - inDropNhAfV6 - VerifyBGPPeerUpdateErrors: bgp_peers: - peer_address: 10.100.0.8 diff --git a/tests/units/anta_tests/routing/test_bgp.py b/tests/units/anta_tests/routing/test_bgp.py index 34f83ff66..47db8e60b 100644 --- a/tests/units/anta_tests/routing/test_bgp.py +++ b/tests/units/anta_tests/routing/test_bgp.py @@ -15,6 +15,7 @@ VerifyBGPExchangedRoutes, VerifyBGPPeerASNCap, VerifyBGPPeerCount, + VerifyBGPPeerDropStats, VerifyBGPPeerMD5Auth, VerifyBGPPeerMPCaps, VerifyBGPPeerRouteRefreshCap, @@ -3723,6 +3724,321 @@ ], }, }, + { + "name": "success", + "test": VerifyBGPPeerDropStats, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "dropStats": { + "inDropAsloop": 0, + "inDropClusterIdLoop": 0, + "inDropMalformedMpbgp": 0, + "inDropOrigId": 0, + "inDropNhLocal": 0, + "inDropNhAfV6": 0, + "prefixDroppedMartianV4": 0, + "prefixDroppedMaxRouteLimitViolatedV4": 0, + "prefixDroppedMartianV6": 0, + }, + } + ] + }, + }, + }, + { + "vrfs": { + "MGMT": { + "peerList": [ + { + "peerAddress": "10.100.0.9", + "dropStats": { + "inDropAsloop": 0, + "inDropClusterIdLoop": 0, + "inDropMalformedMpbgp": 0, + "inDropOrigId": 0, + "inDropNhLocal": 0, + "inDropNhAfV6": 0, + "prefixDroppedMartianV4": 0, + "prefixDroppedMaxRouteLimitViolatedV4": 0, + "prefixDroppedMartianV6": 0, + }, + } + ] + }, + }, + }, + ], + "inputs": { + "bgp_peers": [ + { + "peer_address": "10.100.0.8", + "vrf": "default", + "drop_stats": ["prefixDroppedMartianV4", "prefixDroppedMaxRouteLimitViolatedV4", "prefixDroppedMartianV6"], + }, + {"peer_address": "10.100.0.9", "vrf": "MGMT", "drop_stats": ["inDropClusterIdLoop", "inDropOrigId", "inDropNhLocal"]}, + ] + }, + "expected": {"result": "success"}, + }, + { + "name": "failure-not-found", + "test": VerifyBGPPeerDropStats, + "eos_data": [ + {"vrfs": {}}, + {"vrfs": {}}, + ], + "inputs": { + "bgp_peers": [ + { + "peer_address": "10.100.0.8", + "vrf": "default", + "drop_stats": ["prefixDroppedMartianV4", "prefixDroppedMaxRouteLimitViolatedV4", "prefixDroppedMartianV6"], + }, + {"peer_address": "10.100.0.9", "vrf": "MGMT", "drop_stats": ["inDropClusterIdLoop", "inDropOrigId", "inDropNhLocal"]}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "The following BGP peers are not configured or have non-zero NLRI drop statistics counters:\n" + "{'10.100.0.8': {'default': 'Not configured'}, '10.100.0.9': {'MGMT': 'Not configured'}}" + ], + }, + }, + { + "name": "failure", + "test": VerifyBGPPeerDropStats, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "dropStats": { + "inDropAsloop": 0, + "inDropClusterIdLoop": 0, + "inDropMalformedMpbgp": 0, + "inDropOrigId": 1, + "inDropNhLocal": 1, + "inDropNhAfV6": 0, + "prefixDroppedMartianV4": 1, + "prefixDroppedMaxRouteLimitViolatedV4": 1, + "prefixDroppedMartianV6": 0, + }, + } + ] + }, + }, + }, + { + "vrfs": { + "MGMT": { + "peerList": [ + { + "peerAddress": "10.100.0.9", + "dropStats": { + "inDropAsloop": 0, + "inDropClusterIdLoop": 0, + "inDropMalformedMpbgp": 0, + "inDropOrigId": 1, + "inDropNhLocal": 1, + "inDropNhAfV6": 0, + "prefixDroppedMartianV4": 0, + "prefixDroppedMaxRouteLimitViolatedV4": 0, + "prefixDroppedMartianV6": 0, + }, + } + ] + }, + }, + }, + ], + "inputs": { + "bgp_peers": [ + { + "peer_address": "10.100.0.8", + "vrf": "default", + "drop_stats": ["prefixDroppedMartianV4", "prefixDroppedMaxRouteLimitViolatedV4", "prefixDroppedMartianV6"], + }, + {"peer_address": "10.100.0.9", "vrf": "MGMT", "drop_stats": ["inDropClusterIdLoop", "inDropOrigId", "inDropNhLocal"]}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "The following BGP peers are not configured or have non-zero NLRI drop statistics counters:\n" + "{'10.100.0.8': {'default': {'prefixDroppedMartianV4': 1, 'prefixDroppedMaxRouteLimitViolatedV4': 1}}, " + "'10.100.0.9': {'MGMT': {'inDropOrigId': 1, 'inDropNhLocal': 1}}}" + ], + }, + }, + { + "name": "success-all-drop-stats", + "test": VerifyBGPPeerDropStats, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "dropStats": { + "inDropAsloop": 0, + "inDropClusterIdLoop": 0, + "inDropMalformedMpbgp": 0, + "inDropOrigId": 0, + "inDropNhLocal": 0, + "inDropNhAfV6": 0, + "prefixDroppedMartianV4": 0, + "prefixDroppedMaxRouteLimitViolatedV4": 0, + "prefixDroppedMartianV6": 0, + }, + } + ] + }, + }, + }, + { + "vrfs": { + "MGMT": { + "peerList": [ + { + "peerAddress": "10.100.0.9", + "dropStats": { + "inDropAsloop": 0, + "inDropClusterIdLoop": 0, + "inDropMalformedMpbgp": 0, + "inDropOrigId": 0, + "inDropNhLocal": 0, + "inDropNhAfV6": 0, + "prefixDroppedMartianV4": 0, + "prefixDroppedMaxRouteLimitViolatedV4": 0, + "prefixDroppedMartianV6": 0, + }, + } + ] + }, + }, + }, + ], + "inputs": { + "bgp_peers": [ + {"peer_address": "10.100.0.8", "vrf": "default"}, + {"peer_address": "10.100.0.9", "vrf": "MGMT"}, + ] + }, + "expected": {"result": "success"}, + }, + { + "name": "failure-all-drop-stats", + "test": VerifyBGPPeerDropStats, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "dropStats": { + "inDropAsloop": 3, + "inDropClusterIdLoop": 0, + "inDropMalformedMpbgp": 0, + "inDropOrigId": 1, + "inDropNhLocal": 1, + "inDropNhAfV6": 0, + "prefixDroppedMartianV4": 1, + "prefixDroppedMaxRouteLimitViolatedV4": 1, + "prefixDroppedMartianV6": 0, + }, + } + ] + }, + }, + }, + { + "vrfs": { + "MGMT": { + "peerList": [ + { + "peerAddress": "10.100.0.9", + "dropStats": { + "inDropAsloop": 2, + "inDropClusterIdLoop": 0, + "inDropMalformedMpbgp": 0, + "inDropOrigId": 1, + "inDropNhLocal": 1, + "inDropNhAfV6": 0, + "prefixDroppedMartianV4": 0, + "prefixDroppedMaxRouteLimitViolatedV4": 0, + "prefixDroppedMartianV6": 0, + }, + } + ] + }, + }, + }, + ], + "inputs": { + "bgp_peers": [ + {"peer_address": "10.100.0.8", "vrf": "default"}, + {"peer_address": "10.100.0.9", "vrf": "MGMT"}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "The following BGP peers are not configured or have non-zero NLRI drop statistics counters:\n" + "{'10.100.0.8': {'default': {'inDropAsloop': 3, 'inDropOrigId': 1, 'inDropNhLocal': 1, " + "'prefixDroppedMartianV4': 1, 'prefixDroppedMaxRouteLimitViolatedV4': 1}}, " + "'10.100.0.9': {'MGMT': {'inDropAsloop': 2, 'inDropOrigId': 1, 'inDropNhLocal': 1}}}" + ], + }, + }, + { + "name": "failure-drop-stat-not-found", + "test": VerifyBGPPeerDropStats, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "dropStats": { + "inDropAsloop": 3, + "inDropClusterIdLoop": 0, + "inDropMalformedMpbgp": 0, + "inDropOrigId": 1, + "inDropNhLocal": 1, + "inDropNhAfV6": 0, + "prefixDroppedMaxRouteLimitViolatedV4": 1, + "prefixDroppedMartianV6": 0, + }, + } + ] + }, + }, + }, + ], + "inputs": { + "bgp_peers": [ + {"peer_address": "10.100.0.8", "vrf": "default", "drop_stats": ["inDropAsloop", "inDropOrigId", "inDropNhLocal", "prefixDroppedMartianV4"]} + ] + }, + "expected": { + "result": "failure", + "messages": [ + "The following BGP peers are not configured or have non-zero NLRI drop statistics counters:\n" + "{'10.100.0.8': {'default': {'inDropAsloop': 3, 'inDropOrigId': 1, 'inDropNhLocal': 1, 'prefixDroppedMartianV4': 'Not Found'}}}" + ], + }, + }, { "name": "success", "test": VerifyBGPPeerUpdateErrors, From e9925d351c1515a56e762b51129534aae6c2f7c7 Mon Sep 17 00:00:00 2001 From: Carl Baillargeon Date: Mon, 19 Aug 2024 13:29:53 -0400 Subject: [PATCH 4/7] fix(anta): Add upper bound on Griffe requirement for v1 (#794) --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 6cafa5b7f..ecfcba289 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,7 +81,7 @@ dev = [ ] doc = [ "fontawesome_markdown", - "griffe", + "griffe >=0.46,<1.0.0", "mike==2.1.3", "mkdocs-autorefs>=0.4.1", "mkdocs-bootswatch>=1.1", From 08e945b5bf1cc110e4200cf45a391c7b284da6ef Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 19 Aug 2024 13:40:37 -0400 Subject: [PATCH 5/7] chore: update ruff requirement from <0.6.0,>=0.5.4 to >=0.5.4,<0.7.0 (#790) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Carl Baillargeon --- .pre-commit-config.yaml | 2 +- asynceapi/aio_portcheck.py | 2 +- pyproject.toml | 2 +- tests/lib/fixture.py | 16 ++++++++-------- tests/units/cli/exec/test_utils.py | 2 +- tests/units/test_device.py | 8 ++++---- tests/units/test_runner.py | 12 ++++++------ 7 files changed, 22 insertions(+), 22 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6f632b99f..75d4388a1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -43,7 +43,7 @@ repos: - '' - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.5.7 + rev: v0.6.1 hooks: - id: ruff name: Run Ruff linter diff --git a/asynceapi/aio_portcheck.py b/asynceapi/aio_portcheck.py index 79f4562fa..fd8e7aee2 100644 --- a/asynceapi/aio_portcheck.py +++ b/asynceapi/aio_portcheck.py @@ -33,7 +33,7 @@ # ----------------------------------------------------------------------------- -async def port_check_url(url: URL, timeout: int = 5) -> bool: +async def port_check_url(url: URL, timeout: int = 5) -> bool: # noqa: ASYNC109 """ Open the port designated by the URL given the timeout in seconds. diff --git a/pyproject.toml b/pyproject.toml index ecfcba289..e64ee80df 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,7 +71,7 @@ dev = [ "pytest-html>=3.2.0", "pytest-metadata>=3.0.0", "pytest>=7.4.0", - "ruff>=0.5.4,<0.6.0", + "ruff>=0.5.4,<0.7.0", "tox>=4.10.0,<5.0.0", "types-PyYAML", "types-pyOpenSSL", diff --git a/tests/lib/fixture.py b/tests/lib/fixture.py index 17943edc3..b0205b8bb 100644 --- a/tests/lib/fixture.py +++ b/tests/lib/fixture.py @@ -58,7 +58,7 @@ } -@pytest.fixture() +@pytest.fixture def device(request: pytest.FixtureRequest) -> Iterator[AntaDevice]: """Return an AntaDevice instance with mocked abstract method.""" @@ -78,7 +78,7 @@ def _collect(command: AntaCommand, *args: Any, **kwargs: Any) -> None: # noqa: yield dev -@pytest.fixture() +@pytest.fixture def test_inventory() -> AntaInventory: """Return the test_inventory.""" env = default_anta_env() @@ -93,7 +93,7 @@ def test_inventory() -> AntaInventory: # tests.unit.test_device.py fixture -@pytest.fixture() +@pytest.fixture def async_device(request: pytest.FixtureRequest) -> AsyncEOSDevice: """Return an AsyncEOSDevice instance.""" kwargs = { @@ -110,7 +110,7 @@ def async_device(request: pytest.FixtureRequest) -> AsyncEOSDevice: # tests.units.result_manager fixtures -@pytest.fixture() +@pytest.fixture def test_result_factory(device: AntaDevice) -> Callable[[int], TestResult]: """Return a anta.result_manager.models.TestResult object.""" # pylint: disable=redefined-outer-name @@ -128,7 +128,7 @@ def _create(index: int = 0) -> TestResult: return _create -@pytest.fixture() +@pytest.fixture def list_result_factory(test_result_factory: Callable[[int], TestResult]) -> Callable[[int], list[TestResult]]: """Return a list[TestResult] with 'size' TestResult instantiated using the test_result_factory fixture.""" # pylint: disable=redefined-outer-name @@ -140,7 +140,7 @@ def _factory(size: int = 0) -> list[TestResult]: return _factory -@pytest.fixture() +@pytest.fixture def result_manager_factory(list_result_factory: Callable[[int], list[TestResult]]) -> Callable[[int], ResultManager]: """Return a ResultManager factory that takes as input a number of tests.""" # pylint: disable=redefined-outer-name @@ -155,7 +155,7 @@ def _factory(number: int = 0) -> ResultManager: # tests.units.cli fixtures -@pytest.fixture() +@pytest.fixture def temp_env(tmp_path: Path) -> dict[str, str | None]: """Fixture that create a temporary ANTA inventory. @@ -169,7 +169,7 @@ def temp_env(tmp_path: Path) -> dict[str, str | None]: return env -@pytest.fixture() +@pytest.fixture # Disabling C901 - too complex as we like our runner like this def click_runner(capsys: pytest.CaptureFixture[str]) -> Iterator[CliRunner]: # noqa: C901 """Return a click.CliRunner for cli testing.""" diff --git a/tests/units/cli/exec/test_utils.py b/tests/units/cli/exec/test_utils.py index ad1a78ab1..f4c0cc5fd 100644 --- a/tests/units/cli/exec/test_utils.py +++ b/tests/units/cli/exec/test_utils.py @@ -23,7 +23,7 @@ # TODO: complete test cases -@pytest.mark.asyncio() +@pytest.mark.asyncio @pytest.mark.parametrize( ("inventory_state", "per_device_command_output", "tags"), [ diff --git a/tests/units/test_device.py b/tests/units/test_device.py index e8a0c5f86..d3c50cc8e 100644 --- a/tests/units/test_device.py +++ b/tests/units/test_device.py @@ -613,7 +613,7 @@ class TestAntaDevice: """Test for anta.device.AntaDevice Abstract class.""" - @pytest.mark.asyncio() + @pytest.mark.asyncio @pytest.mark.parametrize( ("device", "command_data", "expected_data"), ((d["device"], d["command"], d["expected"]) for d in COLLECT_DATA), @@ -693,7 +693,7 @@ def test__eq(self, data: dict[str, Any]) -> None: else: assert device1 != device2 - @pytest.mark.asyncio() + @pytest.mark.asyncio @pytest.mark.parametrize( ("async_device", "patch_kwargs", "expected"), ((d["device"], d["patch_kwargs"], d["expected"]) for d in REFRESH_DATA), @@ -712,7 +712,7 @@ async def test_refresh(self, async_device: AsyncEOSDevice, patch_kwargs: list[di assert async_device.established == expected["established"] assert async_device.hw_model == expected["hw_model"] - @pytest.mark.asyncio() + @pytest.mark.asyncio @pytest.mark.parametrize( ("async_device", "command", "expected"), ((d["device"], d["command"], d["expected"]) for d in ASYNCEAPI_COLLECT_DATA), @@ -745,7 +745,7 @@ async def test__collect(self, async_device: AsyncEOSDevice, command: dict[str, A assert cmd.output == expected["output"] assert cmd.errors == expected["errors"] - @pytest.mark.asyncio() + @pytest.mark.asyncio @pytest.mark.parametrize( ("async_device", "copy"), ((d["device"], d["copy"]) for d in ASYNCEAPI_COPY_DATA), diff --git a/tests/units/test_runner.py b/tests/units/test_runner.py index 955149d09..53d0bf758 100644 --- a/tests/units/test_runner.py +++ b/tests/units/test_runner.py @@ -24,7 +24,7 @@ FAKE_CATALOG: AntaCatalog = AntaCatalog.from_list([(FakeTest, None)]) -@pytest.mark.asyncio() +@pytest.mark.asyncio async def test_runner_empty_tests(caplog: pytest.LogCaptureFixture, test_inventory: AntaInventory) -> None: """Test that when the list of tests is empty, a log is raised. @@ -40,7 +40,7 @@ async def test_runner_empty_tests(caplog: pytest.LogCaptureFixture, test_invento assert "The list of tests is empty, exiting" in caplog.records[0].message -@pytest.mark.asyncio() +@pytest.mark.asyncio async def test_runner_empty_inventory(caplog: pytest.LogCaptureFixture) -> None: """Test that when the Inventory is empty, a log is raised. @@ -55,7 +55,7 @@ async def test_runner_empty_inventory(caplog: pytest.LogCaptureFixture) -> None: assert "The inventory is empty, exiting" in caplog.records[1].message -@pytest.mark.asyncio() +@pytest.mark.asyncio async def test_runner_no_selected_device(caplog: pytest.LogCaptureFixture, test_inventory: AntaInventory) -> None: """Test that when the list of established device. @@ -140,7 +140,7 @@ def side_effect_setrlimit(resource_id: int, limits: tuple[int, int]) -> None: setrlimit_mock.assert_called_once_with(resource.RLIMIT_NOFILE, (16384, 1048576)) -@pytest.mark.asyncio() +@pytest.mark.asyncio @pytest.mark.parametrize( ("tags", "expected_tests_count", "expected_devices_count"), [ @@ -173,7 +173,7 @@ async def test_prepare_tests( assert sum(len(tests) for tests in selected_tests.values()) == expected_tests_count -@pytest.mark.asyncio() +@pytest.mark.asyncio async def test_prepare_tests_with_specific_tests(caplog: pytest.LogCaptureFixture, test_inventory: AntaInventory) -> None: """Test the runner prepare_tests function with specific tests.""" logger.setup_logging(logger.Log.INFO) @@ -187,7 +187,7 @@ async def test_prepare_tests_with_specific_tests(caplog: pytest.LogCaptureFixtur assert sum(len(tests) for tests in selected_tests.values()) == 5 -@pytest.mark.asyncio() +@pytest.mark.asyncio async def test_runner_dry_run(caplog: pytest.LogCaptureFixture, test_inventory: AntaInventory) -> None: """Test that when dry_run is True, no tests are run. From b9f95aebae28e9a182f0ae8b3aba8f9ee257816a Mon Sep 17 00:00:00 2001 From: vitthalmagadum <122079046+vitthalmagadum@users.noreply.github.com> Date: Wed, 21 Aug 2024 23:29:57 +0530 Subject: [PATCH 6/7] feat(anta): Added test case to verify registered protocol for IPv4 BFD peers (#773) --- anta/custom_types.py | 1 + anta/tests/bfd.py | 82 +++++++++++++++++- examples/tests.yaml | 7 ++ tests/units/anta_tests/test_bfd.py | 131 ++++++++++++++++++++++++++++- 4 files changed, 217 insertions(+), 4 deletions(-) diff --git a/anta/custom_types.py b/anta/custom_types.py index 56c213977..153fd7011 100644 --- a/anta/custom_types.py +++ b/anta/custom_types.py @@ -198,3 +198,4 @@ def validate_regex(value: str) -> str: "prefixRtMembershipDroppedMaxRouteLimitViolated", ] BgpUpdateError = Literal["inUpdErrWithdraw", "inUpdErrIgnore", "inUpdErrDisableAfiSafi", "disabledAfiSafi", "lastUpdErrTime"] +BfdProtocol = Literal["bgp", "isis", "lag", "ospf", "ospfv3", "pim", "route-input", "static-bfd", "static-route", "vrrp", "vxlan"] diff --git a/anta/tests/bfd.py b/anta/tests/bfd.py index f19e9cc92..0b171a6d2 100644 --- a/anta/tests/bfd.py +++ b/anta/tests/bfd.py @@ -13,7 +13,7 @@ from pydantic import BaseModel, Field -from anta.custom_types import BfdInterval, BfdMultiplier +from anta.custom_types import BfdInterval, BfdMultiplier, BfdProtocol from anta.models import AntaCommand, AntaTest from anta.tools import get_value @@ -45,7 +45,7 @@ class VerifyBFDSpecificPeers(AntaTest): name = "VerifyBFDSpecificPeers" description = "Verifies the IPv4 BFD peer's sessions and remote disc in the specified VRF." categories: ClassVar[list[str]] = ["bfd"] - commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bfd peers", revision=4)] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bfd peers", revision=1)] class Input(AntaTest.Input): """Input model for the VerifyBFDSpecificPeers test.""" @@ -126,7 +126,7 @@ class VerifyBFDPeersIntervals(AntaTest): name = "VerifyBFDPeersIntervals" description = "Verifies the timers of the IPv4 BFD peers in the specified VRF." categories: ClassVar[list[str]] = ["bfd"] - commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bfd peers detail", revision=4)] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bfd peers detail", revision=1)] class Input(AntaTest.Input): """Input model for the VerifyBFDPeersIntervals test.""" @@ -285,3 +285,79 @@ def test(self) -> None: if up_failures: up_failures_str = "\n".join(up_failures) self.result.is_failure(f"\nFollowing BFD peers were down:\n{up_failures_str}") + + +class VerifyBFDPeersRegProtocols(AntaTest): + """Verifies that IPv4 BFD peer(s) have the specified protocol(s) registered. + + Expected Results + ---------------- + * Success: The test will pass if IPv4 BFD peers are registered with the specified protocol(s). + * Failure: The test will fail if IPv4 BFD peers are not found or the specified protocol(s) are not registered for the BFD peer(s). + + Examples + -------- + ```yaml + anta.tests.bfd: + - VerifyBFDPeersRegProtocols: + bfd_peers: + - peer_address: 192.0.255.7 + vrf: default + protocols: + - bgp + ``` + """ + + name = "VerifyBFDPeersRegProtocols" + description = "Verifies that IPv4 BFD peer(s) have the specified protocol(s) registered." + categories: ClassVar[list[str]] = ["bfd"] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bfd peers detail", revision=1)] + + class Input(AntaTest.Input): + """Input model for the VerifyBFDPeersRegProtocols test.""" + + bfd_peers: list[BFDPeer] + """List of IPv4 BFD peers.""" + + class BFDPeer(BaseModel): + """Model for an IPv4 BFD peer.""" + + peer_address: IPv4Address + """IPv4 address of a BFD peer.""" + vrf: str = "default" + """Optional VRF for BFD peer. If not provided, it defaults to `default`.""" + protocols: list[BfdProtocol] + """List of protocols to be verified.""" + + @AntaTest.anta_test + def test(self) -> None: + """Main test function for VerifyBFDPeersRegProtocols.""" + # Initialize failure messages + failures: dict[Any, Any] = {} + + # Iterating over BFD peers, extract the parameters and command output + for bfd_peer in self.inputs.bfd_peers: + peer = str(bfd_peer.peer_address) + vrf = bfd_peer.vrf + protocols = bfd_peer.protocols + bfd_output = get_value( + self.instance_commands[0].json_output, + f"vrfs..{vrf}..ipv4Neighbors..{peer}..peerStats..", + separator="..", + ) + + # Check if BFD peer configured + if not bfd_output: + failures[peer] = {vrf: "Not Configured"} + continue + + # Check registered protocols + difference = set(protocols) - set(get_value(bfd_output, "peerStatsDetail.apps")) + + if difference: + failures[peer] = {vrf: sorted(difference)} + + if not failures: + self.result.is_success() + else: + self.result.is_failure(f"The following BFD peers are not configured or have non-registered protocol(s):\n{failures}") diff --git a/examples/tests.yaml b/examples/tests.yaml index c4248cf75..58161972f 100644 --- a/examples/tests.yaml +++ b/examples/tests.yaml @@ -83,6 +83,13 @@ anta.tests.bfd: multiplier: 3 - VerifyBFDPeersHealth: down_threshold: 2 + - VerifyBFDPeersRegProtocols: + bfd_peers: + - peer_address: 192.0.255.8 + vrf: default + protocols: + - bgp + - isis anta.tests.configuration: - VerifyZeroTouch: diff --git a/tests/units/anta_tests/test_bfd.py b/tests/units/anta_tests/test_bfd.py index 54dc7a05e..b3ab5609a 100644 --- a/tests/units/anta_tests/test_bfd.py +++ b/tests/units/anta_tests/test_bfd.py @@ -10,7 +10,7 @@ # pylint: disable=C0413 # because of the patch above -from anta.tests.bfd import VerifyBFDPeersHealth, VerifyBFDPeersIntervals, VerifyBFDSpecificPeers +from anta.tests.bfd import VerifyBFDPeersHealth, VerifyBFDPeersIntervals, VerifyBFDPeersRegProtocols, VerifyBFDSpecificPeers from tests.lib.anta import test # noqa: F401; pylint: disable=W0611 DATA: list[dict[str, Any]] = [ @@ -519,4 +519,133 @@ ], }, }, + { + "name": "success", + "test": VerifyBFDPeersRegProtocols, + "eos_data": [ + { + "vrfs": { + "default": { + "ipv4Neighbors": { + "192.0.255.7": { + "peerStats": { + "": { + "status": "up", + "remoteDisc": 108328132, + "peerStatsDetail": { + "role": "active", + "apps": ["ospf"], + }, + } + } + } + } + }, + "MGMT": { + "ipv4Neighbors": { + "192.0.255.70": { + "peerStats": { + "": { + "status": "up", + "remoteDisc": 108328132, + "peerStatsDetail": { + "role": "active", + "apps": ["bgp"], + }, + } + } + } + } + }, + } + } + ], + "inputs": { + "bfd_peers": [ + {"peer_address": "192.0.255.7", "vrf": "default", "protocols": ["ospf"]}, + {"peer_address": "192.0.255.70", "vrf": "MGMT", "protocols": ["bgp"]}, + ] + }, + "expected": {"result": "success"}, + }, + { + "name": "failure", + "test": VerifyBFDPeersRegProtocols, + "eos_data": [ + { + "vrfs": { + "default": { + "ipv4Neighbors": { + "192.0.255.7": { + "peerStats": { + "": { + "status": "up", + "peerStatsDetail": { + "role": "active", + "apps": ["ospf"], + }, + } + } + } + } + }, + "MGMT": { + "ipv4Neighbors": { + "192.0.255.70": { + "peerStats": { + "": { + "status": "up", + "remoteDisc": 0, + "peerStatsDetail": { + "role": "active", + "apps": ["bgp"], + }, + } + } + } + } + }, + } + } + ], + "inputs": { + "bfd_peers": [ + {"peer_address": "192.0.255.7", "vrf": "default", "protocols": ["isis"]}, + {"peer_address": "192.0.255.70", "vrf": "MGMT", "protocols": ["isis"]}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "The following BFD peers are not configured or have non-registered protocol(s):\n" + "{'192.0.255.7': {'default': ['isis']}, " + "'192.0.255.70': {'MGMT': ['isis']}}" + ], + }, + }, + { + "name": "failure-not-found", + "test": VerifyBFDPeersRegProtocols, + "eos_data": [ + { + "vrfs": { + "default": {}, + "MGMT": {}, + } + } + ], + "inputs": { + "bfd_peers": [ + {"peer_address": "192.0.255.7", "vrf": "default", "protocols": ["isis"]}, + {"peer_address": "192.0.255.70", "vrf": "MGMT", "protocols": ["isis"]}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "The following BFD peers are not configured or have non-registered protocol(s):\n" + "{'192.0.255.7': {'default': 'Not Configured'}, '192.0.255.70': {'MGMT': 'Not Configured'}}" + ], + }, + }, ] From 61e206efb5636fc6d7498bd50a3a0aacceb45ee3 Mon Sep 17 00:00:00 2001 From: vitthalmagadum <122079046+vitthalmagadum@users.noreply.github.com> Date: Thu, 22 Aug 2024 00:09:06 +0530 Subject: [PATCH 7/7] feat(anta): Added the test case to verify the Entropy source security (#780) --- anta/tests/security.py | 34 +++++++++++++++++++++++++ examples/tests.yaml | 1 + tests/units/anta_tests/test_security.py | 15 +++++++++++ 3 files changed, 50 insertions(+) diff --git a/anta/tests/security.py b/anta/tests/security.py index 4eb4d6415..ae5b9bebd 100644 --- a/anta/tests/security.py +++ b/anta/tests/security.py @@ -820,3 +820,37 @@ def test(self) -> None: self.result.is_failure( f"IPv4 security connection `source:{source_input} destination:{destination_input} vrf:{vrf}` for peer `{peer}` is not found." ) + + +class VerifyHardwareEntropy(AntaTest): + """ + Verifies hardware entropy generation is enabled on device. + + Expected Results + ---------------- + * Success: The test will pass if hardware entropy generation is enabled. + * Failure: The test will fail if hardware entropy generation is not enabled. + + Examples + -------- + ```yaml + anta.tests.security: + - VerifyHardwareEntropy: + ``` + """ + + name = "VerifyHardwareEntropy" + description = "Verifies hardware entropy generation is enabled on device." + categories: ClassVar[list[str]] = ["security"] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show management security")] + + @AntaTest.anta_test + def test(self) -> None: + """Main test function for VerifyHardwareEntropy.""" + command_output = self.instance_commands[0].json_output + + # Check if hardware entropy generation is enabled. + if not command_output.get("hardwareEntropyEnabled"): + self.result.is_failure("Hardware entropy generation is disabled.") + else: + self.result.is_success() diff --git a/examples/tests.yaml b/examples/tests.yaml index 58161972f..c5f87fae7 100644 --- a/examples/tests.yaml +++ b/examples/tests.yaml @@ -354,6 +354,7 @@ anta.tests.security: destination_address: 100.64.2.2 - source_address: 172.18.3.2 destination_address: 172.18.2.2 + - VerifyHardwareEntropy: anta.tests.services: - VerifyHostname: diff --git a/tests/units/anta_tests/test_security.py b/tests/units/anta_tests/test_security.py index 3a732bdaa..eabc40bd8 100644 --- a/tests/units/anta_tests/test_security.py +++ b/tests/units/anta_tests/test_security.py @@ -15,6 +15,7 @@ VerifyAPISSLCertificate, VerifyBannerLogin, VerifyBannerMotd, + VerifyHardwareEntropy, VerifyIPSecConnHealth, VerifyIPv4ACL, VerifySpecificIPSecConn, @@ -1213,4 +1214,18 @@ ], }, }, + { + "name": "success", + "test": VerifyHardwareEntropy, + "eos_data": [{"cpuModel": "2.20GHz", "cryptoModule": "Crypto Module v3.0", "hardwareEntropyEnabled": True, "blockedNetworkProtocols": []}], + "inputs": {}, + "expected": {"result": "success"}, + }, + { + "name": "failure", + "test": VerifyHardwareEntropy, + "eos_data": [{"cpuModel": "2.20GHz", "cryptoModule": "Crypto Module v3.0", "hardwareEntropyEnabled": False, "blockedNetworkProtocols": []}], + "inputs": {}, + "expected": {"result": "failure", "messages": ["Hardware entropy generation is disabled."]}, + }, ]