From ab75ff3630f5fc0d2eed9bb8e3756338f1b92013 Mon Sep 17 00:00:00 2001 From: Giacomo Sanchietti Date: Tue, 7 Nov 2023 16:27:32 +0100 Subject: [PATCH 1/6] fead: add ipsec lib --- src/nethsec/ipsec/__init__.py | 38 +++++++++++++++++++++++++++++++++++ tests/test_ipsec.py | 33 ++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+) create mode 100644 src/nethsec/ipsec/__init__.py create mode 100644 tests/test_ipsec.py diff --git a/src/nethsec/ipsec/__init__.py b/src/nethsec/ipsec/__init__.py new file mode 100644 index 00000000..7b76acdb --- /dev/null +++ b/src/nethsec/ipsec/__init__.py @@ -0,0 +1,38 @@ +#!/usr/bin/python3 + +# +# Copyright (C) 2023 Nethesis S.r.l. +# SPDX-License-Identifier: GPL-2.0-only +# + +''' +IPSec utilities +''' + +import os +from nethsec import utils + +IPSEC_ZONE='ipsec' + +def init_ipsec(uci): + ''' + Initialize IPSec global configuration, if needed. + + Changes are saved to staging area. + + Arguments: + - uci -- EUci pointer + ''' + # Make sure the config file exists + conf = os.path.join(uci.confdir(), 'ipsec') + if not os.path.isfile(conf): + with open(conf, 'a'): + pass + + # Setup global options + gsettings = utils.get_id("ipsec_global") + uci.set("ipsec", gsettings, IPSEC_ZONE) + uci.set("ipsec", gsettings, "debug", '0') + uci.set("ipsec", gsettings, "zone", 'ipsec') + uci.set("ipsec", gsettings, "interface", ['wan']) + uci.commit('ipsec') diff --git a/tests/test_ipsec.py b/tests/test_ipsec.py new file mode 100644 index 00000000..07b464a1 --- /dev/null +++ b/tests/test_ipsec.py @@ -0,0 +1,33 @@ +import pathlib + +import pytest +from euci import EUci +from pytest_mock import MockFixture +from nethsec import ipsec +from nethsec.utils import ValidationError + +ipsec_db = """ +""" + +@pytest.fixture +def e_uci(tmp_path: pathlib.Path) -> EUci: + conf_dir = tmp_path.joinpath('conf') + conf_dir.mkdir() + save_dir = tmp_path.joinpath('save') + save_dir.mkdir() + return EUci(confdir=conf_dir.as_posix(), savedir=save_dir.as_posix()) + + +@pytest.fixture +def e_uci_with_data(e_uci: EUci): + with pathlib.Path(e_uci.confdir()).joinpath('dpi').open('w') as fp: + fp.write(dpi_db) + return e_uci + + +def test_init_ipsec(e_uci): + ipsec.init_ipsec(e_uci) + assert(e_uci.get('ipsec', 'ns_ipsec_global') == 'ipsec') + assert(e_uci.get('ipsec', 'ns_ipsec_global', 'debug') == '0') + assert(e_uci.get('ipsec', 'ns_ipsec_global', 'zone') == ipsec.IPSEC_ZONE) + assert(e_uci.get_all('ipsec', 'ns_ipsec_global', 'interface') == ('wan',)) From 2121cf1b1935bc93d96a3e4ccf97a8bb7809bb65 Mon Sep 17 00:00:00 2001 From: Giacomo Sanchietti Date: Tue, 7 Nov 2023 17:00:56 +0100 Subject: [PATCH 2/6] feat: ipsec, add open_firewall_ports --- src/nethsec/ipsec/__init__.py | 44 +++++++++++++++++++++++++++++++++- tests/test_ipsec.py | 45 ++++++++++++++++++++++++++++++----- 2 files changed, 82 insertions(+), 7 deletions(-) diff --git a/src/nethsec/ipsec/__init__.py b/src/nethsec/ipsec/__init__.py index 7b76acdb..714e9aa8 100644 --- a/src/nethsec/ipsec/__init__.py +++ b/src/nethsec/ipsec/__init__.py @@ -10,7 +10,7 @@ ''' import os -from nethsec import utils +from nethsec import utils, firewall IPSEC_ZONE='ipsec' @@ -36,3 +36,45 @@ def init_ipsec(uci): uci.set("ipsec", gsettings, "zone", 'ipsec') uci.set("ipsec", gsettings, "interface", ['wan']) uci.commit('ipsec') + +def open_firewall_ports(uci): + ''' + Open firewall ports for IPSec tunnels, if need. + + Changes are saved to staging area. + + Arguments: + - uci -- EUci pointer + ''' + esp_accepted = False + ike_accepted = False + nat_accepted = False + esp = {"src": "wan", "dest_port": "", "proto": "esp", "target": "ACCEPT"} + ike = {"src": "wan", "dest_port": "500", "proto": "udp", "target": "ACCEPT"} + nat = {"src": "wan", "dest_port": "4500", "proto": "udp", "target": "ACCEPT"} + # search for existing rules + for r in utils.get_all_by_type(uci, 'firewall', 'rule'): + tmp = dict() + for opt in ['src', 'dest', 'dest_port', 'proto', 'target']: + tmp[opt] = uci.get('firewall', r, opt, default='') + # check if tmp is the esp rule + if all((tmp.get(k) == v for k, v in esp.items())): + esp_accepted = True + # check if tmp is the ike rule + if all((tmp.get(k) == v for k, v in ike.items())): + ike_accepted = True + # check if tmp is the nat rule + if all((tmp.get(k) == v for k, v in nat.items())): + nat_accepted = True + + if not ike_accepted: + firewall.add_template_rule(uci, 'ns_ipsec_ike') + + if not esp_accepted: + firewall.add_template_rule(uci, 'ns_ipsec_esp') + + if not nat_accepted: + firewall.add_template_rule(uci, 'ns_ipsec_nat') + + if not nat_accepted or not ike_accepted or not esp_accepted: + uci.save('firewall') diff --git a/tests/test_ipsec.py b/tests/test_ipsec.py index 07b464a1..3154f2fd 100644 --- a/tests/test_ipsec.py +++ b/tests/test_ipsec.py @@ -3,10 +3,28 @@ import pytest from euci import EUci from pytest_mock import MockFixture -from nethsec import ipsec -from nethsec.utils import ValidationError +from nethsec import ipsec, utils -ipsec_db = """ +templates_db = """ +config template_rule 'ns_ipsec_esp' + option name 'Allow-IPSec-ESP' + option src 'wan' + option proto 'esp' + option target 'ACCEPT' + +config template_rule 'ns_ipsec_ike' + option name 'Allow-IPSec-IKE' + option src 'wan' + option dest_port '500' + option proto 'udp' + option target 'ACCEPT' + +config template_rule 'ns_ipsec_nat' + option name 'Allow-IPSec-NAT' + option src 'wan' + option dest_port '500' + option proto 'udp' + option target 'ACCEPT' """ @pytest.fixture @@ -20,14 +38,29 @@ def e_uci(tmp_path: pathlib.Path) -> EUci: @pytest.fixture def e_uci_with_data(e_uci: EUci): - with pathlib.Path(e_uci.confdir()).joinpath('dpi').open('w') as fp: - fp.write(dpi_db) + with pathlib.Path(e_uci.confdir()).joinpath('templates').open('w') as fp: + fp.write(templates_db) + with pathlib.Path(e_uci.confdir()).joinpath('firewall').open('a') as fp: + pass return e_uci - def test_init_ipsec(e_uci): ipsec.init_ipsec(e_uci) assert(e_uci.get('ipsec', 'ns_ipsec_global') == 'ipsec') assert(e_uci.get('ipsec', 'ns_ipsec_global', 'debug') == '0') assert(e_uci.get('ipsec', 'ns_ipsec_global', 'zone') == ipsec.IPSEC_ZONE) assert(e_uci.get_all('ipsec', 'ns_ipsec_global', 'interface') == ('wan',)) + +def test_open_firewall_ports(e_uci_with_data): + ipsec.open_firewall_ports(e_uci_with_data) + nat = ike = esp = False + for r in utils.get_all_by_type(e_uci_with_data, 'firewall', 'rule'): + name = e_uci_with_data.get('firewall', r, 'name') + print(name) + if name == 'Allow-IPSec-NAT': + nat = True + elif name == 'Allow-IPSec-IKE': + ike = True + elif name == 'Allow-IPSec-ESP': + esp = True + assert (nat and ipsec and esp) From 37529a866e24f4f52a40ef08655e7229d376e6c2 Mon Sep 17 00:00:00 2001 From: Giacomo Sanchietti Date: Tue, 7 Nov 2023 17:33:15 +0100 Subject: [PATCH 3/6] feat: firewall, add new zone function --- src/nethsec/firewall/__init__.py | 43 ++++++++++++++++++++++++++++---- tests/test_firewall.py | 23 +++++++++++------ 2 files changed, 54 insertions(+), 12 deletions(-) diff --git a/src/nethsec/firewall/__init__.py b/src/nethsec/firewall/__init__.py index ac16afb4..bb71ec5f 100644 --- a/src/nethsec/firewall/__init__.py +++ b/src/nethsec/firewall/__init__.py @@ -13,7 +13,7 @@ from nethsec import utils -def add_to_zone(uci, device, zone): +def add_device_to_zone(uci, device, zone): ''' Add given device to a firewall zone. The device is not added if the firewall zone does not exists @@ -45,7 +45,40 @@ def add_to_zone(uci, device, zone): return None -def add_to_lan(uci, device): +def add_interface_to_zone(uci, interface, zone): + ''' + Add given interface to a firewall zone. + The interface is not added if the firewall zone does not exists + Changes are saved to staging area. + + Arguments: + - uci -- EUci pointer + - interface -- Interface name + - zone -- Firewall zone name + + Returns: + - If the firewall zone exists, the name of the section where the device has been added. + - None, otherwise. + ''' + for section in uci.get("firewall"): + s_type = uci.get("firewall", section) + if s_type == "zone": + zname = uci.get("firewall", section, "name") + if zname == zone: + try: + networks = list(uci.get_all("firewall", section, "network")) + except: + networks = [] + if not interface in networks: + networks.append(interface) + uci.set("firewall", section, "network", networks) + uci.save("firewall") + return section + + return None + + +def add_device_to_lan(uci, device): ''' Shortuct to add a device to lan zone @@ -56,9 +89,9 @@ def add_to_lan(uci, device): Returns: - The name of section or None ''' - return add_to_zone(uci, device, 'lan') + return add_device_to_zone(uci, device, 'lan') -def add_to_wan(uci, device): +def add_device_to_wan(uci, device): ''' Shortuct to add a device to wan zone @@ -69,7 +102,7 @@ def add_to_wan(uci, device): Returns: - The name of the configuration section or None ''' - return add_to_zone(uci, device, 'wan') + return add_device_to_zone(uci, device, 'wan') def add_vpn_interface(uci, name, device, link=""): ''' diff --git a/tests/test_firewall.py b/tests/test_firewall.py index 1557df1f..16fe2b09 100644 --- a/tests/test_firewall.py +++ b/tests/test_firewall.py @@ -187,21 +187,30 @@ def _setup_db(tmp_path): fp.write(templates_db) return EUci(confdir=tmp_path.as_posix()) -def test_add_to_zone(tmp_path): +def test_add_interface_to_zone(tmp_path): u = _setup_db(tmp_path) - z1 = firewall.add_to_zone(u, "vnet1", "lan") + z1 = firewall.add_interface_to_zone(u, "interface1", "lan") + assert z1 == 'lan1' + assert 'interface1' in u.get_all('firewall', 'lan1', 'network') + assert firewall.add_interface_to_zone(u, "interface1", "blue") == None + z1 = firewall.add_interface_to_zone(u, "interface2", "lan") + assert 'interface2' in u.get_all('firewall', 'lan1', 'network') + +def test_add_device_to_zone(tmp_path): + u = _setup_db(tmp_path) + z1 = firewall.add_device_to_zone(u, "vnet1", "lan") assert z1 == 'lan1' assert 'vnet1' in u.get_all('firewall', 'lan1', 'device') - assert firewall.add_to_zone(u, "vnet1", "blue") == None + assert firewall.add_device_to_zone(u, "vnet1", "blue") == None -def test_add_to_lan(tmp_path): +def test_add_device_to_lan(tmp_path): u = _setup_db(tmp_path) - assert firewall.add_to_lan(u, "vnet1") == 'lan1' + assert firewall.add_device_to_lan(u, "vnet1") == 'lan1' assert 'vnet1' in u.get_all('firewall', 'lan1', 'device') -def test_add_to_wan(tmp_path): +def test_add_device_to_wan(tmp_path): u = _setup_db(tmp_path) - assert firewall.add_to_wan(u, "vnet2") == 'wan1f' + assert firewall.add_device_to_wan(u, "vnet2") == 'wan1f' assert 'vnet2' in u.get_all('firewall', 'wan1f', 'device') def test_add_service(tmp_path): From 173086a4cbeaf1c609387924a7cb4eb6cdc0d715 Mon Sep 17 00:00:00 2001 From: Giacomo Sanchietti Date: Tue, 7 Nov 2023 17:33:47 +0100 Subject: [PATCH 4/6] feat: ipsec, add zone function --- src/nethsec/ipsec/__init__.py | 14 ++++++++++++++ tests/test_ipsec.py | 26 +++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/src/nethsec/ipsec/__init__.py b/src/nethsec/ipsec/__init__.py index 714e9aa8..efdb00c7 100644 --- a/src/nethsec/ipsec/__init__.py +++ b/src/nethsec/ipsec/__init__.py @@ -78,3 +78,17 @@ def open_firewall_ports(uci): if not nat_accepted or not ike_accepted or not esp_accepted: uci.save('firewall') + +def add_trusted_interface(uci, interface): + ''' + Add the interface to the 'ipsec' trusted zone. The function also creates the trusted zone, if needed. + + Changes are saved to staging area. + + Arguments: + - uci -- EUci pointer + ''' + if firewall.zone_exists(uci, IPSEC_ZONE): + firewall.add_interface_to_zone(uci, interface, IPSEC_ZONE) + else: + firewall.add_trusted_zone(uci, IPSEC_ZONE, [interface]) diff --git a/tests/test_ipsec.py b/tests/test_ipsec.py index 3154f2fd..c0d9279e 100644 --- a/tests/test_ipsec.py +++ b/tests/test_ipsec.py @@ -56,7 +56,6 @@ def test_open_firewall_ports(e_uci_with_data): nat = ike = esp = False for r in utils.get_all_by_type(e_uci_with_data, 'firewall', 'rule'): name = e_uci_with_data.get('firewall', r, 'name') - print(name) if name == 'Allow-IPSec-NAT': nat = True elif name == 'Allow-IPSec-IKE': @@ -64,3 +63,28 @@ def test_open_firewall_ports(e_uci_with_data): elif name == 'Allow-IPSec-ESP': esp = True assert (nat and ipsec and esp) + +def test_add_trusted_interface(e_uci_with_data): + ipsec.add_trusted_interface(e_uci_with_data, 'ipsec1') + count = 0 + zid = '' + # check the zone has been created + for section in e_uci_with_data.get_all('firewall'): + if e_uci_with_data.get('firewall', section) == 'zone': + if e_uci_with_data.get('firewall', section, 'name') == ipsec.IPSEC_ZONE: + count = count + 1 + zid = section + assert(count == 1) + assert(zid) + assert(e_uci_with_data.get_all('firewall', zid, 'network') == ('ipsec1',)) + # check the zone has not been duplicated + count = 0 + ipsec.add_trusted_interface(e_uci_with_data, 'ipsec2') + for section in e_uci_with_data.get_all('firewall'): + if e_uci_with_data.get('firewall', section) == 'zone': + if e_uci_with_data.get('firewall', section, 'name') == ipsec.IPSEC_ZONE: + count = count + 1 + assert(count == 1) + assert('ipsec1' in e_uci_with_data.get_all('firewall', zid, 'network')) + assert('ipsec2' in e_uci_with_data.get_all('firewall', zid, 'network')) + From aa30c9bf3a705444750fddcf7a7ffe3c22438269 Mon Sep 17 00:00:00 2001 From: Giacomo Sanchietti Date: Tue, 7 Nov 2023 17:45:39 +0100 Subject: [PATCH 5/6] feat: firewall, add remove_interface_from_zone --- src/nethsec/firewall/__init__.py | 31 +++++++++++++++++++++++++++++++ tests/test_firewall.py | 5 +++++ 2 files changed, 36 insertions(+) diff --git a/src/nethsec/firewall/__init__.py b/src/nethsec/firewall/__init__.py index bb71ec5f..906e18ef 100644 --- a/src/nethsec/firewall/__init__.py +++ b/src/nethsec/firewall/__init__.py @@ -78,6 +78,37 @@ def add_interface_to_zone(uci, interface, zone): return None +def remove_interface_from_zone(uci, interface, zone): + ''' + Remove the given interface from a firewall zone. + The operation always succeed if the zone does not exists + + Changes are saved to staging area. + + Arguments: + - uci -- EUci pointer + - interface -- Interface name + - zone -- Firewall zone name + + Returns: + - If the firewall zone exists, the name of the section where the device has been removed. + - None, otherwise. + ''' + + for z in utils.get_all_by_type(uci, 'firewall', 'zone'): + if uci.get('firewall', z, 'name') == zone: + try: + networks = list(uci.get_all("firewall", z, "network")) + except: + networks = [] + if interface in networks: + networks.remove(interface) + uci.set("firewall", z, "network", networks) + uci.save("firewall") + return z + return None + + def add_device_to_lan(uci, device): ''' Shortuct to add a device to lan zone diff --git a/tests/test_firewall.py b/tests/test_firewall.py index 16fe2b09..8b7c1de6 100644 --- a/tests/test_firewall.py +++ b/tests/test_firewall.py @@ -196,6 +196,11 @@ def test_add_interface_to_zone(tmp_path): z1 = firewall.add_interface_to_zone(u, "interface2", "lan") assert 'interface2' in u.get_all('firewall', 'lan1', 'network') +def test_remove_interface_from_zone(tmp_path): + u = _setup_db(tmp_path) + z1 = firewall.remove_interface_from_zone(u, 'interface1', "lan") + assert(not 'interface1' in u.get_all('firewall', 'lan1', 'network')) + def test_add_device_to_zone(tmp_path): u = _setup_db(tmp_path) z1 = firewall.add_device_to_zone(u, "vnet1", "lan") From 20b659bde358cf3e64939813c3e288d23a5f063d Mon Sep 17 00:00:00 2001 From: Giacomo Sanchietti Date: Tue, 7 Nov 2023 18:00:26 +0100 Subject: [PATCH 6/6] feat: release 0.0.14 Expose nethsec.ipsec library --- setup.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 91df2646..56618182 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setup( name = 'nethsec', - version = '0.0.13', + version = '0.0.14', author = 'Giacomo Sanchietti', author_email = 'giacomo.sanchietti@nethesis.it', description = 'Utilities for NethSecurity development', @@ -16,8 +16,7 @@ url = "https://github.com/NethServer/python3-nethsec", license = "GPLv3", package_dir = {'': 'src'}, - packages = ['nethsec', 'nethsec.utils', 'nethsec.firewall', 'nethsec.mwan', 'nethsec.dpi'], - #packages = find_packages(), + packages = ['nethsec', 'nethsec.utils', 'nethsec.firewall', 'nethsec.mwan', 'nethsec.dpi', 'nethsec.ipsec'], requires = [ "pyuci" ], classifiers = [ "Programming Language :: Python :: 3",