Skip to content

Commit

Permalink
Introduce new IP address attributes to remove intern_ip
Browse files Browse the repository at this point in the history
  • Loading branch information
TuxPowered42 committed Dec 11, 2023
1 parent f060aa7 commit dc08c29
Show file tree
Hide file tree
Showing 4 changed files with 315 additions and 28 deletions.
2 changes: 1 addition & 1 deletion serveradmin/serverdb/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def clean(self):
# It makes no sense to add inet or supernet attributes to hosts of
# ip_addr_type null because they would have to be empty anyways.
inet_attribute = (
self.cleaned_data['attribute'].type in ('inet', 'supernet') and
self.cleaned_data['attribute'].type in ('inet', 'inet4', 'inet6', 'supernet') and
self.instance.servertype.ip_addr_type == 'null'
)
if inet_attribute:
Expand Down
217 changes: 214 additions & 3 deletions serveradmin/serverdb/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""

import re
from collections import defaultdict
from distutils.util import strtobool
from ipaddress import (
IPv4Address,
Expand All @@ -13,7 +14,7 @@
IPv6Interface,
ip_network,
IPv4Network,
IPv6Network,
IPv6Network, AddressValueError, NetmaskValueError,
)
from typing import Union

Expand All @@ -38,6 +39,8 @@
'reverse': str,
'number': lambda x: float(x) if '.' in str(x) else int(x),
'inet': lambda x: inet_to_python(x),
'inet4': lambda x: inet4_to_python(x),
'inet6': lambda x: inet6_to_python(x),
'macaddr': EUI,
'date': str,
'datetime': str,
Expand Down Expand Up @@ -77,6 +80,47 @@ def get_choices(types):
return zip(*([types] * 2))


def is_supernet_consistent(ip_address, server):
"""Check if requested IP address is consistent with supernets for all other IP address of a server."""

if ip_address.version == 4:
server_attribute_cls = ServerInet4Attribute
else:
server_attribute_cls = ServerInet6Attribute

# A server belongs to different supernets depending on network servertype
supernet_attributes = Attribute.objects.filter(
type='supernet',
servertype_attributes__servertype_id=server.servertype_id,
)
supernet_servertypes = [x.target_servertype for x in supernet_attributes]

for supernet_servertype in supernet_servertypes:
supernet_1 = server.get_supernet(supernet_servertype) # The result is guaranteed to be consistent

if supernet_1 is None:
continue

# Check the supernet of the new IP address
try:
supernet_2 = server_attribute_cls.objects.get(
value__net_contains_or_equals=ip_address,
server__servertype__ip_addr_type='network',
server__servertype_id=supernet_servertype,
).server
except server_attribute_cls.DoesNotExist:
print(f'Zupa: {supernet_1} DoesNotExist')
# There are some provider networks which are valid only for one address family.
# TODO: Enforce validation once the "local" network is gone.
continue

if supernet_1 != supernet_2:
raise ValidationError(
f'Non-matching {supernet_servertype} {supernet_2} for IP address {ip_address}, '
f'other IP addresses of {server.hostname} are in {supernet_1}'
)


# TODO: Make validators out of the methods is_ip_address, is_unique and
# is_network and attach them to the model fields validators.
def is_ip_address(ip_interface: Union[IPv4Interface, IPv6Interface]) -> None:
Expand Down Expand Up @@ -106,16 +150,30 @@ def is_unique_ip(ip_interface: Union[IPv4Interface, IPv6Interface],
:return:
"""

if ip_interface.version == 4:
server_attribute_cls = ServerInet4Attribute
else:
server_attribute_cls = ServerInet6Attribute

# We avoid querying the duplicate hosts here and giving the user
# detailed information because checking with exists is cheaper than
# querying the server and this is a validation and should be fast.
has_duplicates = (
# TODO: Remove "intern_ip" support.
Server.objects.filter(intern_ip=ip_interface).exclude(
Q(servertype__ip_addr_type='network') |
Q(server_id=object_id)
).exists() or
# TODO: Remove "primary_ip6" support.
ServerInetAttribute.objects.filter(value=ip_interface).exclude(
server__servertype__ip_addr_type='network').exists())
Q(server__servertype__ip_addr_type='network') |
Q(server_id=object_id)
).exists() or
server_attribute_cls.objects.filter(value=ip_interface).exclude(
Q(server__servertype__ip_addr_type='network') |
Q(server_id=object_id)
).exists()
)
if has_duplicates:
raise ValidationError(
'An object with {0} already exists'.format(str(ip_interface)))
Expand Down Expand Up @@ -151,6 +209,20 @@ def inet_to_python(obj: object) -> Union[IPv4Interface, IPv6Interface]:
except ValueError as error:
raise ValidationError(str(error))

# WARNING: called only for edit->commit, not for commit!
def inet4_to_python(obj: object) -> IPv4Interface:
try:
return IPv4Interface(obj)
except (AddressValueError, NetmaskValueError):
raise ValidationError(f'{obj} does not appear to be an IPv4 interface')


def inet6_to_python(obj: object) -> IPv4Interface:
try:
return IPv6Interface(obj)
except (AddressValueError, NetmaskValueError):
raise ValidationError(f'{obj} does not appear to be an IPv6 interface')


def network_overlaps(ip_interface: Union[IPv4Interface, IPv6Interface],
servertype_id: str, object_id: int) -> None:
Expand Down Expand Up @@ -226,6 +298,7 @@ def __init__(self, *args, **kwargs):
max_length=32,
choices=get_choices(ATTRIBUTE_TYPES.keys()),
)
supernet = models.BooleanField(null=False, default=False)
multi = models.BooleanField(null=False, default=False)
hovertext = models.TextField(null=False, blank=True, default='')
group = models.CharField(
Expand Down Expand Up @@ -438,11 +511,52 @@ def __str__(self):
return self.hostname

def get_supernet(self, servertype):
return Server.objects.get(
"""Get a supernet of given servertype for the current server.
This function will check all IP addresses of a server which have the "supernet" feature enabled.
If data is inconsistent, an exception is raised.
No matching network for just some of addresses does not mean inconsitency.
"""

supernet_1 = None

# TODO: Remove "intern_ip" support.
supernet_1 = Server.objects.get(
servertype=servertype,
# It should probably match on ip_addr_type too, but we will remove this soon anyway.
intern_ip__net_contains_or_equals=self.intern_ip,
)

for server_attribute_cls in (ServerInet4Attribute, ServerInet6Attribute):
ip_addresses = server_attribute_cls.objects.filter(
server_id=self.server_id,
attribute__supernet=True,
)

# TODO: How to net_contains_or_equals for iterable?
for ip_address in ip_addresses:
try:
attr = server_attribute_cls.objects.get(
value__net_contains_or_equals=ip_address.value,
server__servertype__ip_addr_type='network',
server__servertype_id=servertype.servertype_id,
)
if not attr.attribute.supernet:
raise ValidationError(f'Not a supernet: {servertype}!')
supernet_2 = attr.server
# TODO: Shouldn't we check that the requested servertype really point
except server_attribute_cls.DoesNotExist:
continue
else:
# Always trust the 1st found network
if supernet_1 is None:
supernet_1 = supernet_2
# Verify that all found networks match the 1st found one.
elif supernet_1 != supernet_2:
raise ValidationError(f'Can\'t determine {servertype} for {self.hostname}!')

return supernet_1

def clean(self):
super(Server, self).clean()

Expand Down Expand Up @@ -474,6 +588,9 @@ def clean(self):
network_overlaps(self.intern_ip, self.servertype.servertype_id,
self.server_id)

if ip_addr_type != 'null':
is_supernet_consistent(self.intern_ip, self.server)

def get_attributes(self, attribute):
model = ServerAttribute.get_model(attribute.type)
return model.objects.filter(server=self, attribute=attribute)
Expand Down Expand Up @@ -520,6 +637,10 @@ def get_model(attribute_type):
return ServerNumberAttribute
if attribute_type == 'inet':
return ServerInetAttribute
if attribute_type == 'inet4':
return ServerInet4Attribute
if attribute_type == 'inet6':
return ServerInet6Attribute
if attribute_type == 'macaddr':
return ServerMACAddressAttribute
if attribute_type == 'date':
Expand Down Expand Up @@ -695,6 +816,96 @@ def clean(self):
network_overlaps(self.value, self.server.servertype_id,
self.server.server_id)

is_supernet_consistent(self.value, self.server)


class ServerInet4Attribute(ServerAttribute):
attribute = models.ForeignKey(
Attribute,
db_index=False,
on_delete=models.CASCADE,
limit_choices_to=dict(type='inet4'),
)
value = netfields.InetAddressField()

class Meta:
app_label = 'serverdb'
db_table = 'server_inet4_attribute'
unique_together = [['server', 'attribute', 'value']]
index_together = [['attribute', 'value']]

def clean(self):
super(ServerAttribute, self).clean()

if type(self.value) != IPv4Interface:
self.value = inet4_to_python(self.value)

# Get the ip_addr_type of the servertype
ip_addr_type = self.server.servertype.ip_addr_type

if ip_addr_type == 'null':
# A Servertype with ip_addr_type "null" and attributes of type
# inet must be denied per configuration. This is just a safety net
# in case e.g. somebody creates them programmatically.
raise ValidationError(
_('%(attribute_id)s must be null'), code='invalid value',
params={'attribute_id': self.attribute_id})
elif ip_addr_type == 'host':
is_ip_address(self.value)
is_unique_ip(self.value, self.server.server_id)
elif ip_addr_type == 'loadbalancer':
is_ip_address(self.value)
elif ip_addr_type == 'network':
is_network(self.value)
network_overlaps(self.value, self.server.servertype_id,
self.server.server_id)

is_supernet_consistent(self.value, self.server)


class ServerInet6Attribute(ServerAttribute):
attribute = models.ForeignKey(
Attribute,
db_index=False,
on_delete=models.CASCADE,
limit_choices_to=dict(type='inet6'),
)
value = netfields.InetAddressField()

class Meta:
app_label = 'serverdb'
db_table = 'server_inet6_attribute'
unique_together = [['server', 'attribute', 'value']]
index_together = [['attribute', 'value']]

def clean(self):
super(ServerAttribute, self).clean()

if type(self.value) != IPv6Interface:
self.value = inet6_to_python(self.value)

# Get the ip_addr_type of the servertype
ip_addr_type = self.server.servertype.ip_addr_type

if ip_addr_type == 'null':
# A Servertype with ip_addr_type "null" and attributes of type
# inet must be denied per configuration. This is just a safety net
# in case e.g. somebody creates them programmatically.
raise ValidationError(
_('%(attribute_id)s must be null'), code='invalid value',
params={'attribute_id': self.attribute_id})
elif ip_addr_type == 'host':
is_ip_address(self.value)
is_unique_ip(self.value, self.server.server_id)
elif ip_addr_type == 'loadbalancer':
is_ip_address(self.value)
elif ip_addr_type == 'network':
is_network(self.value)
network_overlaps(self.value, self.server.servertype_id,
self.server.server_id)

is_supernet_consistent(self.value, self.server)


class ServerMACAddressAttribute(ServerAttribute):
attribute = models.ForeignKey(
Expand Down
Loading

0 comments on commit dc08c29

Please sign in to comment.