Skip to content

Commit

Permalink
Merge pull request #46 from NethServer/objects
Browse files Browse the repository at this point in the history
  • Loading branch information
gsanchietti authored Jul 5, 2024
2 parents a287f1a + 4f12bd9 commit a79da62
Show file tree
Hide file tree
Showing 7 changed files with 1,747 additions and 136 deletions.
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setup(
name = 'nethsec',
version = '0.0.66',
version = '0.0.67',
author = 'Giacomo Sanchietti',
author_email = '[email protected]',
description = 'Utilities for NethSecurity development',
Expand All @@ -16,7 +16,7 @@
url = "https://github.com/NethServer/python3-nethsec",
license = "GPLv3",
package_dir = {'': 'src'},
packages = ['nethsec', 'nethsec.utils', 'nethsec.firewall', 'nethsec.mwan', 'nethsec.dpi', 'nethsec.ipsec', 'nethsec.ovpn', 'nethsec.users', 'nethsec.reverse_proxy', 'nethsec.inventory', 'nethsec.conntrack', 'nethsec.ldif'],
packages = ['nethsec', 'nethsec.utils', 'nethsec.firewall', 'nethsec.mwan', 'nethsec.dpi', 'nethsec.ipsec', 'nethsec.ovpn', 'nethsec.users', 'nethsec.reverse_proxy', 'nethsec.inventory', 'nethsec.conntrack', 'nethsec.ldif', 'nethsec.objects'],
requires = [ "pyuci" ],
classifiers = [
"Programming Language :: Python :: 3",
Expand Down
193 changes: 175 additions & 18 deletions src/nethsec/firewall/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from euci import EUci

from nethsec import utils
from nethsec import utils, objects

PROTOCOLS = ['tcp', 'udp', 'udplite', 'icmp', 'esp', 'ah', 'sctp']
TARGETS = ['ACCEPT', 'DROP', 'REJECT']
Expand Down Expand Up @@ -1315,6 +1315,19 @@ def list_host_suggestions(uci):
ret = ret + list_active_leases()
return ret

def list_object_suggestions(uci, expand = False):
"""
Get all objects from objects config
Args:
uci: EUci pointer
expand: if True, expand object details
Returns:
a list of all objects, each object is a dict with keys value, label, type
"""
return objects.list_all_objects(uci, expand)

def validate_address_format(address: str) -> bool:
"""
Validate address format.
Expand Down Expand Up @@ -1386,7 +1399,7 @@ def validate_port_format(port: str) -> bool:
return False
return True

def validate_rule(src: str, src_ip: list[str], dest: str, dest_ip: list[str], proto: list, dest_port: list[str], target: str, service: str):
def validate_rule(uci, src: str, src_ip: list[str], dest: str, dest_ip: list[str], proto: list, dest_port: list[str], target: str, service: str, ns_src: str, ns_dst: str):
"""
Validate rule.
Expand All @@ -1399,17 +1412,29 @@ def validate_rule(src: str, src_ip: list[str], dest: str, dest_ip: list[str], pr
dest_port: a list of destination ports, each element cna be be a port number, a comma-separated list of port numbers or a range with `-` (eg. 80-90)
target: target, must be one of 'ACCEPT', 'REJECT', 'DROP'
service: service name
ns_src: an object in the form `<database>/<id>`
ns_dst: an object in the form `<database>/<id>`
Raises:
ValidationError: if rule is invalid
"""
for s in src_ip:
if not validate_address_format(s):
raise utils.ValidationError('src_ip', 'invalid_format', s)
for d in dest_ip:
if not validate_address_format(d):
raise utils.ValidationError('dest_ip', 'invalid_format', d)
if src == dest:
if ns_src:
if not objects.object_exists(uci, ns_src):
raise utils.ValidationError('ns_src', 'object_not_found', ns_src)
else: # check source only if not using objects
for s in src_ip:
if not validate_address_format(s):
raise utils.ValidationError('src_ip', 'invalid_format', s)
if ns_dst:
if not objects.object_exists(uci, ns_dst):
raise utils.ValidationError('ns_dst', 'object_not_found', ns_dst)
else: # check destiation only if not using objects
for d in dest_ip:
if not validate_address_format(d):
raise utils.ValidationError('dest_ip', 'invalid_format', d)
if ns_src and ns_dst and objects.is_domain_set(uci, ns_src) and objects.is_domain_set(uci, ns_dst):
raise utils.ValidationError('ns_dst', 'domain_set_conflict', ns_dst)
if (not ns_src and not ns_dst) and src == dest: # check only if not using objects
raise utils.ValidationError('dest', 'same_zone', dest)
if target not in TARGETS:
raise utils.ValidationError('target', 'invalid_target', target)
Expand Down Expand Up @@ -1442,7 +1467,7 @@ def get_service_by_name(name: str) -> dict:
return None

def setup_rule(uci, id: str, name: str, src: str, src_ip: list[str], dest: str, dest_ip: list[str], proto: list, dest_port: list[str], target: str, service: str,
enabled: bool = True, log: bool = False, tag = []) -> None:
enabled: bool = True, log: bool = False, tag = [], ns_src: str = None, ns_dst: str = None) -> None:
"""
Set up a rule in the firewall config.
Expand All @@ -1461,6 +1486,8 @@ def setup_rule(uci, id: str, name: str, src: str, src_ip: list[str], dest: str,
enabled: if True, rule is enabled; if False, rule is disabled
log: if True, log traffic
tag: list of optional tags
ns_src: an object in the form `<database>/<id>`
ns_dst: an object in the form `<database>/<id>`
"""
uci.set('firewall', id, 'name', name)
uci.set('firewall', id, 'src', src)
Expand Down Expand Up @@ -1490,6 +1517,20 @@ def setup_rule(uci, id: str, name: str, src: str, src_ip: list[str], dest: str,
uci.set('firewall', id, 'enabled', '1' if enabled else '0')
uci.set('firewall', id, 'log', '1' if log else '0')
uci.set('firewall', id, 'ns_tag', tag)
if ns_src:
uci.set('firewall', id, 'ns_src', ns_src)
else:
try:
uci.delete('firewall', id, 'ns_src')
except:
pass
if ns_dst:
uci.set('firewall', id, 'ns_dst', ns_dst)
else:
try:
uci.delete('firewall', id, 'ns_dst')
except:
pass
uci.save('firewall')

def split_firewall_config(uci):
Expand Down Expand Up @@ -1554,7 +1595,7 @@ def reorder_firewall_config(uci):
uci.save('firewall')

def add_rule(uci, name: str, src: str, src_ip: list[str], dest: str, dest_ip: list[str], proto: list, dest_port: list[str], target: str, service: str,
enabled: bool = True, log: bool = False, tag = [], add_to_top: bool = False) -> str:
enabled: bool = True, log: bool = False, tag = [], add_to_top: bool = False, ns_src: str = None, ns_dst: str = None) -> str:
"""
Add rule to firewall config.
Expand All @@ -1574,16 +1615,18 @@ def add_rule(uci, name: str, src: str, src_ip: list[str], dest: str, dest_ip: li
log: if True, log traffic
tag: list of optional tags
add_to_top: if True, add rule to the top of the list, otherwise add to the bottom
ns_src: an object in the form `<database>/<id>`
ns_dst: an object in the form `<database>/<id>`
Returns:
name of rule config that was added
"""
validate_rule(src, src_ip, dest, dest_ip, proto, dest_port, target, service)
validate_rule(uci, src, src_ip, dest, dest_ip, proto, dest_port, target, service, ns_src, ns_dst)
rule = utils.get_random_id()
uci.set('firewall', rule, 'rule')
setup_rule(uci, rule, name, src, src_ip, dest, dest_ip, proto, dest_port, target, service, enabled, log, tag)
uci.save('firewall')
setup_rule(uci, rule, name, src, src_ip, dest, dest_ip, proto, dest_port, target, service, enabled, log, tag, ns_src, ns_dst)
reorder_firewall_config(uci)
update_firewall_rules(uci) # expand objects and save

if add_to_top:
rule_type = uci.get_all('firewall', rule)
Expand All @@ -1599,7 +1642,7 @@ def add_rule(uci, name: str, src: str, src_ip: list[str], dest: str, dest_ip: li
return rule

def edit_rule(uci, id: str, name: str, src: str, src_ip: list[str], dest: str, dest_ip: list[str], proto: list, dest_port: list[str], target: str, service: str,
enabled: bool = True, log: bool = False, tag = []) -> str:
enabled: bool = True, log: bool = False, tag = [], ns_src: str = None, ns_dst: str = None) -> str:
"""
Edit rule in firewall config.
Expand All @@ -1618,14 +1661,17 @@ def edit_rule(uci, id: str, name: str, src: str, src_ip: list[str], dest: str, d
enabled: if True, rule is enabled, if False, rule is disabled
log: if True, log traffic
tag: list of optional tags
ns_src: an object in the form `<database>/<id>`
ns_dst: an object in the form `<database>/<id>`
Returns:
name of rule config that was edited
"""
if not uci.get('firewall', id, default=None):
raise utils.ValidationError("id", "rule_does_not_exists", id)
validate_rule(src, src_ip, dest, dest_ip, proto, dest_port, target, service)
setup_rule(uci, id, name, src, src_ip, dest, dest_ip, proto, dest_port, target, service, enabled, log, tag)
validate_rule(uci, src, src_ip, dest, dest_ip, proto, dest_port, target, service, ns_src, ns_dst)
setup_rule(uci, id, name, src, src_ip, dest, dest_ip, proto, dest_port, target, service, enabled, log, tag, ns_src, ns_dst)
update_firewall_rules(uci) # expand objects and save
return id

def list_nat_rules(uci) -> list:
Expand Down Expand Up @@ -1866,4 +1912,115 @@ def list_netmap_devices(uci) -> list:
devices.append({"device": device['ifname'], "interface": utils.get_interface_from_device(uci, device['ifname'])})
except:
pass
return devices
return devices

# Objects

def update_redirect_rules(uci):
"""
Update redirect rules with ipset field set and ns_src set.
Args:
uci: EUci pointer
changed_sections: list of changed objects, each object is in the form of `<database>/<id>`
"""
for section in utils.get_all_by_type(uci, 'firewall', 'redirect'):
ns_src = uci.get('firewall', section, 'ns_src', default=None)
ns_dst = uci.get('firewall', section, 'ns_dst', default=None)
if ns_dst:
ipaddr = objects.get_object_ip(uci, ns_dst)
if ipaddr:
uci.set('firewall', section, 'dest_ip', ipaddr)
if ns_src:
database, id = ns_src.split('/')
obj_type = uci.get(database, id)
if database == "objects" and obj_type == "domain":
ipsets = objects.get_domain_set_ipsets(uci, id)
uci.set('firewall', section, 'ipset', f"{ipsets['firewall']} src_net")
try:
uci.delete('firewall', f"{section}_ipset")
except:
pass
else:
uci.set('firewall', section, 'ipset', f"{id}_ipset")
uci.set('firewall', f"{section}_ipset", "ipset")
uci.set('firewall', f"{section}_ipset", "name", f"{id}_ipset")
uci.set('firewall', f"{section}_ipset", "match", "src_net")
uci.set('firewall', f"{section}_ipset", "enabled", "1")
uci.set('firewall', f"{section}_ipset", "entry", objects.get_object_ips(uci, ns_src))

for section in utils.get_all_by_type(uci, 'firewall', 'rule'):
ns_dst = uci.get('firewall', section, 'ns_dst', default=None)
ns_src = uci.get('firewall', section, 'ns_src', default=None)
if ns_dst:
ipaddr = objects.get_object_ips(uci, ns_dst)
if ipaddr:
uci.set('firewall', section, 'dest_ip', ipaddr)
if ns_src:
ipaddr = objects.get_object_ips(uci, ns_dst)
if ipaddr:
uci.set('firewall', section, 'src_ip', ipaddr)
uci.save('firewall')

def update_firewall_rules(uci):
"""
Update firewall rules with ipset field set and ns_src set.
Args:
uci: EUci pointer
"""
for section in utils.get_all_by_type(uci, 'firewall', 'rule'):
keep_ipset = False
ns_src = uci.get('firewall', section, 'ns_src', default=None)
ns_dst = uci.get('firewall', section, 'ns_dst', default=None)
name = uci.get('firewall', section, 'name', default=None)
if ns_src:
if objects.is_domain_set(uci, ns_src):
keep_ipset = True
id = ns_src.split('/')[1]
ipsets = objects.get_domain_set_ipsets(uci, id)
uci.set('firewall', section, 'ipset', f"{ipsets['firewall']} src")
try:
uci.delete('firewall', section, 'src_ip')
except:
pass
else:
ipaddr = objects.get_object_ips(uci, ns_src)
if ipaddr:
uci.set('firewall', section, 'src_ip', ipaddr)
try:
uci.delete('firewall', section, 'ipset')
except:
pass
if ns_dst:
if objects.is_domain_set(uci, ns_dst):
id = ns_dst.split('/')[1]
ipsets = objects.get_domain_set_ipsets(uci, id)
uci.set('firewall', section, 'ipset', f"{ipsets['firewall']} dst")
try:
uci.delete('firewall', section, 'dest_ip')
except:
pass
else:
ipaddr = objects.get_object_ips(uci, ns_dst)
if ipaddr:
uci.set('firewall', section, 'dest_ip', ipaddr)
if not keep_ipset: # do not delete ipset from src if is a domain
try:
uci.delete('firewall', section, 'ipset')
except:
pass
uci.save('firewall')

def list_object_suggestions(uci, expand = False):
"""
Get all objects from objects config
Args:
uci: EUci pointer
expand: if True, expand object details
Returns:
a list of all objects, each object is a dict with keys value, label, type
"""
return objects.list_all_objects(uci, expand)
Loading

0 comments on commit a79da62

Please sign in to comment.