Skip to content

Commit

Permalink
Merge branch 'main' into bump/cryptography
Browse files Browse the repository at this point in the history
  • Loading branch information
gmuloc authored Aug 26, 2024
2 parents ec2667e + 61e206e commit 9f6294c
Show file tree
Hide file tree
Showing 17 changed files with 1,271 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ repos:
- '<!--| ~| -->'

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.5.7
rev: v0.6.1
hooks:
- id: ruff
name: Run Ruff linter
Expand Down
32 changes: 32 additions & 0 deletions anta/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,35 @@ def validate_regex(value: str) -> str:
Hostname = Annotated[str, Field(pattern=REGEXP_TYPE_HOSTNAME)]
Port = Annotated[int, Field(ge=1, le=65535)]
RegexString = Annotated[str, AfterValidator(validate_regex)]
BgpDropStats = Literal[
"inDropAsloop",
"inDropClusterIdLoop",
"inDropMalformedMpbgp",
"inDropOrigId",
"inDropNhLocal",
"inDropNhAfV6",
"prefixDroppedMartianV4",
"prefixDroppedMaxRouteLimitViolatedV4",
"prefixDroppedMartianV6",
"prefixDroppedMaxRouteLimitViolatedV6",
"prefixLuDroppedV4",
"prefixLuDroppedMartianV4",
"prefixLuDroppedMaxRouteLimitViolatedV4",
"prefixLuDroppedV6",
"prefixLuDroppedMartianV6",
"prefixLuDroppedMaxRouteLimitViolatedV6",
"prefixEvpnDroppedUnsupportedRouteType",
"prefixBgpLsDroppedReceptionUnsupported",
"outDropV4LocalAddr",
"outDropV6LocalAddr",
"prefixVpnIpv4DroppedImportMatchFailure",
"prefixVpnIpv4DroppedMaxRouteLimitViolated",
"prefixVpnIpv6DroppedImportMatchFailure",
"prefixVpnIpv6DroppedMaxRouteLimitViolated",
"prefixEvpnDroppedImportMatchFailure",
"prefixEvpnDroppedMaxRouteLimitViolated",
"prefixRtMembershipDroppedLocalAsReject",
"prefixRtMembershipDroppedMaxRouteLimitViolated",
]
BgpUpdateError = Literal["inUpdErrWithdraw", "inUpdErrIgnore", "inUpdErrDisableAfiSafi", "disabledAfiSafi", "lastUpdErrTime"]
BfdProtocol = Literal["bgp", "isis", "lag", "ospf", "ospfv3", "pim", "route-input", "static-bfd", "static-route", "vrrp", "vxlan"]
82 changes: 79 additions & 3 deletions anta/tests/bfd.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from pydantic import BaseModel, Field

from anta.custom_types import BfdInterval, BfdMultiplier
from anta.custom_types import BfdInterval, BfdMultiplier, BfdProtocol
from anta.models import AntaCommand, AntaTest
from anta.tools import get_value

Expand Down Expand Up @@ -45,7 +45,7 @@ class VerifyBFDSpecificPeers(AntaTest):
name = "VerifyBFDSpecificPeers"
description = "Verifies the IPv4 BFD peer's sessions and remote disc in the specified VRF."
categories: ClassVar[list[str]] = ["bfd"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bfd peers", revision=4)]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bfd peers", revision=1)]

class Input(AntaTest.Input):
"""Input model for the VerifyBFDSpecificPeers test."""
Expand Down Expand Up @@ -126,7 +126,7 @@ class VerifyBFDPeersIntervals(AntaTest):
name = "VerifyBFDPeersIntervals"
description = "Verifies the timers of the IPv4 BFD peers in the specified VRF."
categories: ClassVar[list[str]] = ["bfd"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bfd peers detail", revision=4)]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bfd peers detail", revision=1)]

class Input(AntaTest.Input):
"""Input model for the VerifyBFDPeersIntervals test."""
Expand Down Expand Up @@ -285,3 +285,79 @@ def test(self) -> None:
if up_failures:
up_failures_str = "\n".join(up_failures)
self.result.is_failure(f"\nFollowing BFD peers were down:\n{up_failures_str}")


class VerifyBFDPeersRegProtocols(AntaTest):
"""Verifies that IPv4 BFD peer(s) have the specified protocol(s) registered.
Expected Results
----------------
* Success: The test will pass if IPv4 BFD peers are registered with the specified protocol(s).
* Failure: The test will fail if IPv4 BFD peers are not found or the specified protocol(s) are not registered for the BFD peer(s).
Examples
--------
```yaml
anta.tests.bfd:
- VerifyBFDPeersRegProtocols:
bfd_peers:
- peer_address: 192.0.255.7
vrf: default
protocols:
- bgp
```
"""

name = "VerifyBFDPeersRegProtocols"
description = "Verifies that IPv4 BFD peer(s) have the specified protocol(s) registered."
categories: ClassVar[list[str]] = ["bfd"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bfd peers detail", revision=1)]

class Input(AntaTest.Input):
"""Input model for the VerifyBFDPeersRegProtocols test."""

bfd_peers: list[BFDPeer]
"""List of IPv4 BFD peers."""

class BFDPeer(BaseModel):
"""Model for an IPv4 BFD peer."""

peer_address: IPv4Address
"""IPv4 address of a BFD peer."""
vrf: str = "default"
"""Optional VRF for BFD peer. If not provided, it defaults to `default`."""
protocols: list[BfdProtocol]
"""List of protocols to be verified."""

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyBFDPeersRegProtocols."""
# Initialize failure messages
failures: dict[Any, Any] = {}

# Iterating over BFD peers, extract the parameters and command output
for bfd_peer in self.inputs.bfd_peers:
peer = str(bfd_peer.peer_address)
vrf = bfd_peer.vrf
protocols = bfd_peer.protocols
bfd_output = get_value(
self.instance_commands[0].json_output,
f"vrfs..{vrf}..ipv4Neighbors..{peer}..peerStats..",
separator="..",
)

# Check if BFD peer configured
if not bfd_output:
failures[peer] = {vrf: "Not Configured"}
continue

# Check registered protocols
difference = set(protocols) - set(get_value(bfd_output, "peerStatsDetail.apps"))

if difference:
failures[peer] = {vrf: sorted(difference)}

if not failures:
self.result.is_success()
else:
self.result.is_failure(f"The following BFD peers are not configured or have non-registered protocol(s):\n{failures}")
180 changes: 179 additions & 1 deletion anta/tests/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from pydantic.v1.utils import deep_update
from pydantic_extra_types.mac_address import MacAddress

from anta.custom_types import Afi, MultiProtocolCaps, Safi, Vni
from anta.custom_types import Afi, BgpDropStats, BgpUpdateError, MultiProtocolCaps, Safi, Vni
from anta.models import AntaCommand, AntaTemplate, AntaTest
from anta.tools import get_item, get_value

Expand Down Expand Up @@ -1226,3 +1226,181 @@ def test(self) -> None:
self.result.is_success()
else:
self.result.is_failure(f"Following BGP peers are not configured or hold and keep-alive timers are not correct:\n{failures}")


class VerifyBGPPeerDropStats(AntaTest):
"""Verifies BGP NLRI drop statistics for the provided BGP IPv4 peer(s).
By default, all drop statistics counters will be checked for any non-zero values.
An optional list of specific drop statistics can be provided for granular testing.
Expected Results
----------------
* Success: The test will pass if the BGP peer's drop statistic(s) are zero.
* Failure: The test will fail if the BGP peer's drop statistic(s) are non-zero/Not Found or peer is not configured.
Examples
--------
```yaml
anta.tests.routing:
bgp:
- VerifyBGPPeerDropStats:
bgp_peers:
- peer_address: 172.30.11.1
vrf: default
drop_stats:
- inDropAsloop
- prefixEvpnDroppedUnsupportedRouteType
```
"""

name = "VerifyBGPPeerDropStats"
description = "Verifies the NLRI drop statistics of a BGP IPv4 peer(s)."
categories: ClassVar[list[str]] = ["bgp"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaTemplate(template="show bgp neighbors {peer} vrf {vrf}", revision=3)]

class Input(AntaTest.Input):
"""Input model for the VerifyBGPPeerDropStats test."""

bgp_peers: list[BgpPeer]
"""List of BGP peers"""

class BgpPeer(BaseModel):
"""Model for a BGP peer."""

peer_address: IPv4Address
"""IPv4 address of a BGP peer."""
vrf: str = "default"
"""Optional VRF for BGP peer. If not provided, it defaults to `default`."""
drop_stats: list[BgpDropStats] | None = None
"""Optional list of drop statistics to be verified. If not provided, test will verifies all the drop statistics."""

def render(self, template: AntaTemplate) -> list[AntaCommand]:
"""Render the template for each BGP peer in the input list."""
return [template.render(peer=str(bgp_peer.peer_address), vrf=bgp_peer.vrf) for bgp_peer in self.inputs.bgp_peers]

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyBGPPeerDropStats."""
failures: dict[Any, Any] = {}

for command, input_entry in zip(self.instance_commands, self.inputs.bgp_peers):
peer = command.params.peer
vrf = command.params.vrf
drop_statistics = input_entry.drop_stats

# Verify BGP peer
if not (peer_list := get_value(command.json_output, f"vrfs.{vrf}.peerList")) or (peer_detail := get_item(peer_list, "peerAddress", peer)) is None:
failures[peer] = {vrf: "Not configured"}
continue

# Verify BGP peer's drop stats
drop_stats_output = peer_detail.get("dropStats", {})

# In case drop stats not provided, It will check all drop statistics
if not drop_statistics:
drop_statistics = drop_stats_output

# Verify BGP peer's drop stats
drop_stats_not_ok = {
drop_stat: drop_stats_output.get(drop_stat, "Not Found") for drop_stat in drop_statistics if drop_stats_output.get(drop_stat, "Not Found")
}
if any(drop_stats_not_ok):
failures[peer] = {vrf: drop_stats_not_ok}

# Check if any failures
if not failures:
self.result.is_success()
else:
self.result.is_failure(f"The following BGP peers are not configured or have non-zero NLRI drop statistics counters:\n{failures}")


class VerifyBGPPeerUpdateErrors(AntaTest):
"""Verifies BGP update error counters for the provided BGP IPv4 peer(s).
By default, all update error counters will be checked for any non-zero values.
An optional list of specific update error counters can be provided for granular testing.
Note: For "disabledAfiSafi" error counter field, checking that it's not "None" versus 0.
Expected Results
----------------
* Success: The test will pass if the BGP peer's update error counter(s) are zero/None.
* Failure: The test will fail if the BGP peer's update error counter(s) are non-zero/not None/Not Found or
peer is not configured.
Examples
--------
```yaml
anta.tests.routing:
bgp:
- VerifyBGPPeerUpdateErrors:
bgp_peers:
- peer_address: 172.30.11.1
vrf: default
update_error_filter:
- inUpdErrWithdraw
```
"""

name = "VerifyBGPPeerUpdateErrors"
description = "Verifies the update error counters of a BGP IPv4 peer."
categories: ClassVar[list[str]] = ["bgp"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaTemplate(template="show bgp neighbors {peer} vrf {vrf}", revision=3)]

class Input(AntaTest.Input):
"""Input model for the VerifyBGPPeerUpdateErrors test."""

bgp_peers: list[BgpPeer]
"""List of BGP peers"""

class BgpPeer(BaseModel):
"""Model for a BGP peer."""

peer_address: IPv4Address
"""IPv4 address of a BGP peer."""
vrf: str = "default"
"""Optional VRF for BGP peer. If not provided, it defaults to `default`."""
update_errors: list[BgpUpdateError] | None = None
"""Optional list of update error counters to be verified. If not provided, test will verifies all the update error counters."""

def render(self, template: AntaTemplate) -> list[AntaCommand]:
"""Render the template for each BGP peer in the input list."""
return [template.render(peer=str(bgp_peer.peer_address), vrf=bgp_peer.vrf) for bgp_peer in self.inputs.bgp_peers]

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyBGPPeerUpdateErrors."""
failures: dict[Any, Any] = {}

for command, input_entry in zip(self.instance_commands, self.inputs.bgp_peers):
peer = command.params.peer
vrf = command.params.vrf
update_error_counters = input_entry.update_errors

# Verify BGP peer.
if not (peer_list := get_value(command.json_output, f"vrfs.{vrf}.peerList")) or (peer_detail := get_item(peer_list, "peerAddress", peer)) is None:
failures[peer] = {vrf: "Not configured"}
continue

# Getting the BGP peer's error counters output.
error_counters_output = peer_detail.get("peerInUpdateErrors", {})

# In case update error counters not provided, It will check all the update error counters.
if not update_error_counters:
update_error_counters = error_counters_output

# verifying the error counters.
error_counters_not_ok = {
("disabledAfiSafi" if error_counter == "disabledAfiSafi" else error_counter): value
for error_counter in update_error_counters
if (value := error_counters_output.get(error_counter, "Not Found")) != "None" and value != 0
}
if error_counters_not_ok:
failures[peer] = {vrf: error_counters_not_ok}

# Check if any failures
if not failures:
self.result.is_success()
else:
self.result.is_failure(f"The following BGP peers are not configured or have non-zero update error counters:\n{failures}")
Loading

0 comments on commit 9f6294c

Please sign in to comment.