Skip to content

Commit

Permalink
Merge branch '2.x' into add-url-option-to-brew.tap
Browse files Browse the repository at this point in the history
  • Loading branch information
znd4 authored Nov 30, 2023
2 parents b028836 + 9256e6a commit 69b0b26
Show file tree
Hide file tree
Showing 20 changed files with 520 additions and 194 deletions.
12 changes: 3 additions & 9 deletions pyinfra/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,12 @@ class ConfigDefaults:
def check_pyinfra_version(version: str):
if not version:
return

running_version = parse_version(__version__)
required_versions = Requirement.parse(
"pyinfra{0}".format(version),
)
required_versions = Requirement.parse("pyinfra{0}".format(version))

if running_version not in required_versions:
if not required_versions.specifier.contains(running_version):
raise PyinfraError(
("pyinfra version requirement not met " "(requires {0}, running {1})").format(
version,
__version__,
),
f"pyinfra version requirement not met (requires {version}, running {__version__})"
)


Expand Down
289 changes: 162 additions & 127 deletions pyinfra/facts/hardware.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,156 +97,191 @@ def process(self, output):
return devices


nettools_1_regexes = [
(
r"^inet addr:([0-9\.]+).+Bcast:([0-9\.]+).+Mask:([0-9\.]+)$",
("ipv4", "address", "broadcast", "netmask"),
),
(
r"^inet6 addr: ([0-9a-z:]+)\/([0-9]+) Scope:Global",
("ipv6", "address", "mask_bits"),
),
]

nettools_2_regexes = [
(
r"^inet ([0-9\.]+)\s+netmask ([0-9\.fx]+)(?:\s+broadcast ([0-9\.]+))?$",
("ipv4", "address", "netmask", "broadcast"),
),
(
r"^inet6 ([0-9a-z:]+)\s+prefixlen ([0-9]+)",
("ipv6", "address", "mask_bits"),
),
]

iproute2_regexes = [
(
r"^inet ([0-9\.]+)\/([0-9]{1,2})(?:\s+brd ([0-9\.]+))?",
("ipv4", "address", "mask_bits", "broadcast"),
),
(
r"^inet6 ([0-9a-z:]+)\/([0-9]{1,3})",
("ipv6", "address", "mask_bits"),
),
]


def _parse_regexes(regexes, lines):
data = {
"ipv4": {},
"ipv6": {},
}

for line in lines:
for regex, groups in regexes:
matches = re.match(regex, line)
if matches:
ip_data = {}

for i, group in enumerate(groups[1:]):
ip_data[group] = matches.group(i + 1)

if "mask_bits" in ip_data:
ip_data["mask_bits"] = int(ip_data["mask_bits"])

target_group = data[groups[0]]
if target_group.get("address"):
target_group.setdefault("additional_ips", []).append(ip_data)
else:
target_group.update(ip_data)

break

return data


class NetworkDevices(FactBase):
"""
Gets & returns a dict of network devices. See the ``ipv4_addresses`` and
``ipv6_addresses`` facts for easier-to-use shortcuts to get device addresses.
.. code:: python
{
"eth0": {
"ipv4": {
"address": "127.0.0.1",
"broadcast": "127.0.0.13",
# Only one of these will exist:
"netmask": "255.255.255.255",
"mask_bits": 32,
},
"ipv6": {
"address": "fe80::a00:27ff:fec3:36f0",
"mask_bits": 64,
"additional_ips": [{
"address": "fe80::",
"mask_bits": 128,
}],
}
"enp1s0": {
"ether": "12:34:56:78:9A:BC",
"mtu": 1500,
"state": "UP",
"ipv4": {
"address": "192.168.1.100",
"mask_bits": 24,
"netmask": "255.255.255.0"
},
"ipv6": {
"address": "2001:db8:85a3::8a2e:370:7334",
"mask_bits": 64,
"additional_ips": [
{
"address": "fe80::1234:5678:9abc:def0",
"mask_bits": 64
}
]
}
},
"incusbr0": {
"ether": "DE:AD:BE:EF:CA:FE",
"mtu": 1500,
"state": "UP",
"ipv4": {
"address": "10.0.0.1",
"mask_bits": 24,
"netmask": "255.255.255.0"
},
"ipv6": {
"address": "fe80::dead:beef:cafe:babe",
"mask_bits": 64,
"additional_ips": [
{
"address": "2001:db8:1234:5678::1",
"mask_bits": 64
}
]
}
},
"lo": {
"mtu": 65536,
"state": "UP",
"ipv6": {
"address": "::1",
"mask_bits": 128
}
},
"veth98806fd6": {
"ether": "AA:BB:CC:DD:EE:FF",
"mtu": 1500,
"state": "UP"
},
"vethda29df81": {
"ether": "11:22:33:44:55:66",
"mtu": 1500,
"state": "UP"
},
"wlo1": {
"ether": "77:88:99:AA:BB:CC",
"mtu": 1500,
"state": "UNKNOWN"
}
"""

command = "ip addr show 2> /dev/null || ifconfig"
command = "ip addr show 2> /dev/null || ifconfig -a"
default = dict

# Definition of valid interface names for Linux:
# https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/net/core/dev.c?h=v5.1.3#n1020
_start_regexes = [
(
r"^([^/: \s]+)\s+Link encap:",
lambda lines: _parse_regexes(nettools_1_regexes, lines),
),
(
r"^([^/: \s]+): flags=",
lambda lines: _parse_regexes(nettools_2_regexes, lines),
),
(
r"^[0-9]+: ([^/: \s]+): ",
lambda lines: _parse_regexes(iproute2_regexes, lines),
),
]

def process(self, output):
devices = {}

# Store current matches (start lines), the handler and any lines
matches = None
handler = None
line_buffer = []
def mask(value):
try:
if value.startswith("0x"):
mask_bits = bin(int(value, 16)).count("1")
else:
mask_bits = int(value)
netmask = ".".join(
str((0xFFFFFFFF << (32 - b) >> mask_bits) & 0xFF) for b in (24, 16, 8, 0)
)
except ValueError:
mask_bits = sum(bin(int(x)).count("1") for x in value.split("."))
netmask = value

for line in output:
line = line.strip()
return mask_bits, netmask

matched = False
# Strip lines and merge them as a block of text
output = "\n".join(map(str.strip, output))

# Look for start lines
for regex, new_handler in self._start_regexes:
new_matches = re.match(regex, line)
# Splitting the output into sections per network device
device_sections = re.split(r"\n(?=\d+: \w|\w+:.*mtu.*)", output)

# If we find a start line
if new_matches:
matched = True
# Dictionary to hold all device information
all_devices = {}

# Assign any current matches with current handler, reset buffer
if matches:
devices[matches.group(1)] = handler(line_buffer)
line_buffer = []
for section in device_sections:
# Extracting the device name
device_name_match = re.match(r"^(?:\d+: )?([\w@]+):", section)
if not device_name_match:
continue
device_name = device_name_match.group(1)

# Regular expressions to match different parts of the output
ether_re = re.compile(r"([0-9A-Fa-f:]{17})")
mtu_re = re.compile(r"mtu (\d+)")
ipv4_re = (
re.compile(
r"inet (\d+\.\d+\.\d+\.\d+)/(\d+)(?: brd (\d+\.\d+\.\d+\.\d+))"
), # ip a output,
re.compile(
r"inet (\d+\.\d+\.\d+\.\d+)\s+netmask\s+((?:\d+\.\d+\.\d+\.\d+)|(?:[0-9a-fA-FxX]+))(?:\s+broadcast\s+(\d+\.\d+\.\d+\.\d+))" # noqa: E501
), # ifconfig -a output
)

# Parsing the output
ether = ether_re.search(section)
mtu = mtu_re.search(section)

# Building the result dictionary for the device
device_info = {}
if ether:
device_info["ether"] = ether.group(1)
if mtu:
device_info["mtu"] = int(mtu.group(1))

device_info["state"] = (
"UP" if "UP" in section else "DOWN" if "DOWN" in section else "UNKNOWN"
)

# IPv4 Addresses
for ipv4_re_ in ipv4_re:
ipv4_matches = ipv4_re_.findall(section)
if ipv4_matches:
break

# Set new matches/handler
matches = new_matches
handler = new_handler
if ipv4_matches:
ipv4_info = []
for ipv4 in ipv4_matches:
address = ipv4[0]
mask_value = ipv4[1]
mask_bits, netmask = mask(mask_value)
broadcast = ipv4[2] if len(ipv4) == 3 else None

ipv4_info.append(
{
"address": address,
"mask_bits": mask_bits,
"netmask": netmask,
"broadcast": broadcast,
},
)
device_info["ipv4"] = ipv4_info[0]
if len(ipv4_matches) > 1:
device_info["ipv4"]["additional_ips"] = ipv4_info[1:]

# IPv6 Addresses
ipv6_re = (
re.compile(r"inet6\s+([0-9a-fA-F:]+)/(\d+)"),
re.compile(r"inet6\s+([0-9a-fA-F:]+)\s+prefixlen\s+(\d+)"),
)

for ipv6_re_ in ipv6_re:
ipv6_matches = ipv6_re_.findall(section)
if ipv6_matches:
break

if not matched:
line_buffer.append(line)
if ipv6_matches:
ipv6_info = []
for ipv6 in ipv6_matches:
address = ipv6[0]
mask_bits = ipv6[1] or ipv6[2]
ipv6_info.append({"address": address, "mask_bits": int(mask_bits)})
device_info["ipv6"] = ipv6_info[0]
if len(ipv6_matches) > 1:
device_info["ipv6"]["additional_ips"] = ipv6_info[1:]

# Handle any left over matches
if matches:
devices[matches.group(1)] = handler(line_buffer)
all_devices[device_name] = device_info

return devices
return all_devices


class Ipv4Addrs(ShortFactBase):
Expand All @@ -273,7 +308,7 @@ def process_data(self, data):
ips = []

ip_details = details.get(self.ip_type)
if not ip_details:
if not ip_details or not ip_details.get("address"):
continue

ips.append(ip_details["address"])
Expand Down Expand Up @@ -332,7 +367,7 @@ def process_data(self, data):

for interface, details in data.items():
ip_details = details.get(self.ip_type)
if not ip_details:
if not ip_details or not ip_details.get("address"):
continue # pragma: no cover

addresses[interface] = ip_details["address"]
Expand Down
Loading

0 comments on commit 69b0b26

Please sign in to comment.