Skip to content

Commit

Permalink
Add new attributes for separate IPv4 and IPv6 addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
TuxPowered42 committed Feb 14, 2024
1 parent f060aa7 commit ed19964
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 5 deletions.
2 changes: 1 addition & 1 deletion serveradmin/serverdb/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def clean(self):
# It makes no sense to add inet or supernet attributes to hosts of
# ip_addr_type null because they would have to be empty anyways.
inet_attribute = (
self.cleaned_data['attribute'].type in ('inet', 'supernet') and
self.cleaned_data['attribute'].type in ('inet', 'inet4', 'inet6', 'supernet') and
self.instance.servertype.ip_addr_type == 'null'
)
if inet_attribute:
Expand Down
48 changes: 48 additions & 0 deletions serveradmin/serverdb/migrations/0017_new_ip_address_attributes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Generated by Django 3.2.20 on 2024-02-14 16:29

from django.db import migrations, models
import django.db.models.deletion
import netfields.fields


class Migration(migrations.Migration):

dependencies = [
('serverdb', '0016_optional_servertype_for_relation'),
]

operations = [
migrations.AlterField(
model_name='attribute',
name='type',
field=models.CharField(choices=[('string', 'string'), ('boolean', 'boolean'), ('relation', 'relation'), ('reverse', 'reverse'), ('number', 'number'), ('inet', 'inet'), ('inet4', 'inet4'), ('inet6', 'inet6'), ('macaddr', 'macaddr'), ('date', 'date'), ('datetime', 'datetime'), ('supernet', 'supernet'), ('domain', 'domain')], max_length=32),
),
migrations.CreateModel(
name='ServerInet6Attribute',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('value', netfields.fields.InetAddressField(max_length=39)),
('attribute', models.ForeignKey(db_index=False, limit_choices_to={'type': 'inet6'}, on_delete=django.db.models.deletion.CASCADE, to='serverdb.attribute')),
('server', models.ForeignKey(db_index=False, on_delete=django.db.models.deletion.CASCADE, to='serverdb.server')),
],
options={
'db_table': 'server_inet6_attribute',
'unique_together': {('server', 'attribute', 'value')},
'index_together': {('attribute', 'value')},
},
),
migrations.CreateModel(
name='ServerInet4Attribute',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('value', netfields.fields.InetAddressField(max_length=39)),
('attribute', models.ForeignKey(db_index=False, limit_choices_to={'type': 'inet4'}, on_delete=django.db.models.deletion.CASCADE, to='serverdb.attribute')),
('server', models.ForeignKey(db_index=False, on_delete=django.db.models.deletion.CASCADE, to='serverdb.server')),
],
options={
'db_table': 'server_inet4_attribute',
'unique_together': {('server', 'attribute', 'value')},
'index_together': {('attribute', 'value')},
},
),
]
122 changes: 120 additions & 2 deletions serveradmin/serverdb/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
IPv6Interface,
ip_network,
IPv4Network,
IPv6Network,
IPv6Network, AddressValueError, NetmaskValueError,
)
from typing import Union

Expand All @@ -38,6 +38,8 @@
'reverse': str,
'number': lambda x: float(x) if '.' in str(x) else int(x),
'inet': lambda x: inet_to_python(x),
'inet4': lambda x: inet4_to_python(x),
'inet6': lambda x: inet6_to_python(x),
'macaddr': EUI,
'date': str,
'datetime': str,
Expand Down Expand Up @@ -106,16 +108,31 @@ def is_unique_ip(ip_interface: Union[IPv4Interface, IPv6Interface],
:return:
"""

if ip_interface.version == 4:
server_attribute_cls = ServerInet4Attribute
else:
server_attribute_cls = ServerInet6Attribute

# We avoid querying the duplicate hosts here and giving the user
# detailed information because checking with exists is cheaper than
# querying the server and this is a validation and should be fast.
# Always exclude the current object_id from the query because we allow
# duplication of data between the legacy (intern_ip, primary_ip6) and
# the modern (ipv4, ipv6) attributes.
has_duplicates = (
Server.objects.filter(intern_ip=ip_interface).exclude(
Q(servertype__ip_addr_type='network') |
Q(server_id=object_id)
).exists() or
ServerInetAttribute.objects.filter(value=ip_interface).exclude(
server__servertype__ip_addr_type='network').exists())
Q(server__servertype__ip_addr_type='network') |
Q(server_id=object_id)
).exists() or
server_attribute_cls.objects.filter(value=ip_interface).exclude(
Q(server__servertype__ip_addr_type='network') |
Q(server_id=object_id)
).exists()
)
if has_duplicates:
raise ValidationError(
'An object with {0} already exists'.format(str(ip_interface)))
Expand Down Expand Up @@ -151,6 +168,20 @@ def inet_to_python(obj: object) -> Union[IPv4Interface, IPv6Interface]:
except ValueError as error:
raise ValidationError(str(error))

# WARNING: called only for edit->commit, not for commit!
def inet4_to_python(obj: object) -> IPv4Interface:
try:
return IPv4Interface(obj)
except (AddressValueError, NetmaskValueError):
raise ValidationError(f'{obj} does not appear to be an IPv4 interface')


def inet6_to_python(obj: object) -> IPv6Interface:
try:
return IPv6Interface(obj)
except (AddressValueError, NetmaskValueError):
raise ValidationError(f'{obj} does not appear to be an IPv6 interface')


def network_overlaps(ip_interface: Union[IPv4Interface, IPv6Interface],
servertype_id: str, object_id: int) -> None:
Expand Down Expand Up @@ -520,6 +551,10 @@ def get_model(attribute_type):
return ServerNumberAttribute
if attribute_type == 'inet':
return ServerInetAttribute
if attribute_type == 'inet4':
return ServerInet4Attribute
if attribute_type == 'inet6':
return ServerInet6Attribute
if attribute_type == 'macaddr':
return ServerMACAddressAttribute
if attribute_type == 'date':
Expand Down Expand Up @@ -695,6 +730,89 @@ def clean(self):
network_overlaps(self.value, self.server.servertype_id,
self.server.server_id)

class ServerInet4Attribute(ServerAttribute):
attribute = models.ForeignKey(
Attribute,
db_index=False,
on_delete=models.CASCADE,
limit_choices_to=dict(type='inet4'),
)
value = netfields.InetAddressField()

class Meta:
app_label = 'serverdb'
db_table = 'server_inet4_attribute'
unique_together = [['server', 'attribute', 'value']]
index_together = [['attribute', 'value']]

def clean(self):
super(ServerAttribute, self).clean()

if type(self.value) != IPv4Interface:
self.value = inet4_to_python(self.value)

# Get the ip_addr_type of the servertype
ip_addr_type = self.server.servertype.ip_addr_type

if ip_addr_type == 'null':
# A Servertype with ip_addr_type "null" and attributes of type
# inet must be denied per configuration. This is just a safety net
# in case e.g. somebody creates them programmatically.
raise ValidationError(
_('%(attribute_id)s must be null'), code='invalid value',
params={'attribute_id': self.attribute_id})
elif ip_addr_type == 'host':
is_ip_address(self.value)
is_unique_ip(self.value, self.server.server_id)
elif ip_addr_type == 'loadbalancer':
is_ip_address(self.value)
elif ip_addr_type == 'network':
is_network(self.value)
network_overlaps(self.value, self.server.servertype_id,
self.server.server_id)


class ServerInet6Attribute(ServerAttribute):
attribute = models.ForeignKey(
Attribute,
db_index=False,
on_delete=models.CASCADE,
limit_choices_to=dict(type='inet6'),
)
value = netfields.InetAddressField()

class Meta:
app_label = 'serverdb'
db_table = 'server_inet6_attribute'
unique_together = [['server', 'attribute', 'value']]
index_together = [['attribute', 'value']]

def clean(self):
super(ServerAttribute, self).clean()

if type(self.value) != IPv6Interface:
self.value = inet6_to_python(self.value)

# Get the ip_addr_type of the servertype
ip_addr_type = self.server.servertype.ip_addr_type

if ip_addr_type == 'null':
# A Servertype with ip_addr_type "null" and attributes of type
# inet must be denied per configuration. This is just a safety net
# in case e.g. somebody creates them programmatically.
raise ValidationError(
_('%(attribute_id)s must be null'), code='invalid value',
params={'attribute_id': self.attribute_id})
elif ip_addr_type == 'host':
is_ip_address(self.value)
is_unique_ip(self.value, self.server.server_id)
elif ip_addr_type == 'loadbalancer':
is_ip_address(self.value)
elif ip_addr_type == 'network':
is_network(self.value)
network_overlaps(self.value, self.server.servertype_id,
self.server.server_id)


class ServerMACAddressAttribute(ServerAttribute):
attribute = models.ForeignKey(
Expand Down
2 changes: 1 addition & 1 deletion serveradmin/serverdb/query_materializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def _get_attributes(self, server, join_results): # NOQA: C901
if attribute not in self._joined_attributes:
continue

if attribute.type == 'inet':
if attribute.type in ['inet', 'inet4', 'inet6']:
if value is None:
yield attribute.attribute_id, None
else:
Expand Down
2 changes: 1 addition & 1 deletion serveradmin/serverdb/sql_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def _containment_filter_template(attribute, filt):
template = None # To be formatted 2 times
value = filt.value

if attribute.type == 'inet':
if attribute.type in ['inet', 'inet4', 'inet6']:
if isinstance(filt, StartsWith):
template = "{{0}} >>= {0} AND host({{0}}) = host(0{})"
elif isinstance(filt, Contains):
Expand Down

0 comments on commit ed19964

Please sign in to comment.