Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fead: add ipsec lib #18

Merged
merged 6 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setup(
name = 'nethsec',
version = '0.0.13',
version = '0.0.14',
author = 'Giacomo Sanchietti',
author_email = '[email protected]',
description = 'Utilities for NethSecurity development',
Expand All @@ -16,8 +16,7 @@
url = "https://github.com/NethServer/python3-nethsec",
license = "GPLv3",
package_dir = {'': 'src'},
packages = ['nethsec', 'nethsec.utils', 'nethsec.firewall', 'nethsec.mwan', 'nethsec.dpi'],
#packages = find_packages(),
packages = ['nethsec', 'nethsec.utils', 'nethsec.firewall', 'nethsec.mwan', 'nethsec.dpi', 'nethsec.ipsec'],
requires = [ "pyuci" ],
classifiers = [
"Programming Language :: Python :: 3",
Expand Down
74 changes: 69 additions & 5 deletions src/nethsec/firewall/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from nethsec import utils


def add_to_zone(uci, device, zone):
def add_device_to_zone(uci, device, zone):
'''
Add given device to a firewall zone.
The device is not added if the firewall zone does not exists
Expand Down Expand Up @@ -45,7 +45,71 @@ def add_to_zone(uci, device, zone):

return None

def add_to_lan(uci, device):
def add_interface_to_zone(uci, interface, zone):
'''
Add given interface to a firewall zone.
The interface is not added if the firewall zone does not exists
Changes are saved to staging area.

Arguments:
- uci -- EUci pointer
- interface -- Interface name
- zone -- Firewall zone name

Returns:
- If the firewall zone exists, the name of the section where the device has been added.
- None, otherwise.
'''
for section in uci.get("firewall"):
s_type = uci.get("firewall", section)
if s_type == "zone":
zname = uci.get("firewall", section, "name")
if zname == zone:
try:
networks = list(uci.get_all("firewall", section, "network"))
except:
networks = []
if not interface in networks:
networks.append(interface)
uci.set("firewall", section, "network", networks)
uci.save("firewall")
return section

return None


def remove_interface_from_zone(uci, interface, zone):
'''
Remove the given interface from a firewall zone.
The operation always succeed if the zone does not exists

Changes are saved to staging area.

Arguments:
- uci -- EUci pointer
- interface -- Interface name
- zone -- Firewall zone name

Returns:
- If the firewall zone exists, the name of the section where the device has been removed.
- None, otherwise.
'''

for z in utils.get_all_by_type(uci, 'firewall', 'zone'):
if uci.get('firewall', z, 'name') == zone:
try:
networks = list(uci.get_all("firewall", z, "network"))
except:
networks = []
if interface in networks:
networks.remove(interface)
uci.set("firewall", z, "network", networks)
uci.save("firewall")
return z
return None


def add_device_to_lan(uci, device):
'''
Shortuct to add a device to lan zone

Expand All @@ -56,9 +120,9 @@ def add_to_lan(uci, device):
Returns:
- The name of section or None
'''
return add_to_zone(uci, device, 'lan')
return add_device_to_zone(uci, device, 'lan')

def add_to_wan(uci, device):
def add_device_to_wan(uci, device):
'''
Shortuct to add a device to wan zone

Expand All @@ -69,7 +133,7 @@ def add_to_wan(uci, device):
Returns:
- The name of the configuration section or None
'''
return add_to_zone(uci, device, 'wan')
return add_device_to_zone(uci, device, 'wan')

def add_vpn_interface(uci, name, device, link=""):
'''
Expand Down
94 changes: 94 additions & 0 deletions src/nethsec/ipsec/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#!/usr/bin/python3

#
# Copyright (C) 2023 Nethesis S.r.l.
# SPDX-License-Identifier: GPL-2.0-only
#

'''
IPSec utilities
'''

import os
from nethsec import utils, firewall

IPSEC_ZONE='ipsec'

def init_ipsec(uci):
'''
Initialize IPSec global configuration, if needed.

Changes are saved to staging area.

Arguments:
- uci -- EUci pointer
'''
# Make sure the config file exists
conf = os.path.join(uci.confdir(), 'ipsec')
if not os.path.isfile(conf):
with open(conf, 'a'):
pass

# Setup global options
gsettings = utils.get_id("ipsec_global")
uci.set("ipsec", gsettings, IPSEC_ZONE)
uci.set("ipsec", gsettings, "debug", '0')
uci.set("ipsec", gsettings, "zone", 'ipsec')
uci.set("ipsec", gsettings, "interface", ['wan'])
uci.commit('ipsec')

def open_firewall_ports(uci):
'''
Open firewall ports for IPSec tunnels, if need.

Changes are saved to staging area.

Arguments:
- uci -- EUci pointer
'''
esp_accepted = False
ike_accepted = False
nat_accepted = False
esp = {"src": "wan", "dest_port": "", "proto": "esp", "target": "ACCEPT"}
ike = {"src": "wan", "dest_port": "500", "proto": "udp", "target": "ACCEPT"}
nat = {"src": "wan", "dest_port": "4500", "proto": "udp", "target": "ACCEPT"}
# search for existing rules
for r in utils.get_all_by_type(uci, 'firewall', 'rule'):
tmp = dict()
for opt in ['src', 'dest', 'dest_port', 'proto', 'target']:
tmp[opt] = uci.get('firewall', r, opt, default='')
# check if tmp is the esp rule
if all((tmp.get(k) == v for k, v in esp.items())):
esp_accepted = True
# check if tmp is the ike rule
if all((tmp.get(k) == v for k, v in ike.items())):
ike_accepted = True
# check if tmp is the nat rule
if all((tmp.get(k) == v for k, v in nat.items())):
nat_accepted = True

if not ike_accepted:
firewall.add_template_rule(uci, 'ns_ipsec_ike')

if not esp_accepted:
firewall.add_template_rule(uci, 'ns_ipsec_esp')

if not nat_accepted:
firewall.add_template_rule(uci, 'ns_ipsec_nat')

if not nat_accepted or not ike_accepted or not esp_accepted:
uci.save('firewall')

def add_trusted_interface(uci, interface):
'''
Add the interface to the 'ipsec' trusted zone. The function also creates the trusted zone, if needed.

Changes are saved to staging area.

Arguments:
- uci -- EUci pointer
'''
if firewall.zone_exists(uci, IPSEC_ZONE):
firewall.add_interface_to_zone(uci, interface, IPSEC_ZONE)
else:
firewall.add_trusted_zone(uci, IPSEC_ZONE, [interface])
28 changes: 21 additions & 7 deletions tests/test_firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,21 +187,35 @@ def _setup_db(tmp_path):
fp.write(templates_db)
return EUci(confdir=tmp_path.as_posix())

def test_add_to_zone(tmp_path):
def test_add_interface_to_zone(tmp_path):
u = _setup_db(tmp_path)
z1 = firewall.add_to_zone(u, "vnet1", "lan")
z1 = firewall.add_interface_to_zone(u, "interface1", "lan")
assert z1 == 'lan1'
assert 'interface1' in u.get_all('firewall', 'lan1', 'network')
assert firewall.add_interface_to_zone(u, "interface1", "blue") == None
z1 = firewall.add_interface_to_zone(u, "interface2", "lan")
assert 'interface2' in u.get_all('firewall', 'lan1', 'network')

def test_remove_interface_from_zone(tmp_path):
u = _setup_db(tmp_path)
z1 = firewall.remove_interface_from_zone(u, 'interface1', "lan")
assert(not 'interface1' in u.get_all('firewall', 'lan1', 'network'))

def test_add_device_to_zone(tmp_path):
u = _setup_db(tmp_path)
z1 = firewall.add_device_to_zone(u, "vnet1", "lan")
assert z1 == 'lan1'
assert 'vnet1' in u.get_all('firewall', 'lan1', 'device')
assert firewall.add_to_zone(u, "vnet1", "blue") == None
assert firewall.add_device_to_zone(u, "vnet1", "blue") == None

def test_add_to_lan(tmp_path):
def test_add_device_to_lan(tmp_path):
u = _setup_db(tmp_path)
assert firewall.add_to_lan(u, "vnet1") == 'lan1'
assert firewall.add_device_to_lan(u, "vnet1") == 'lan1'
assert 'vnet1' in u.get_all('firewall', 'lan1', 'device')

def test_add_to_wan(tmp_path):
def test_add_device_to_wan(tmp_path):
u = _setup_db(tmp_path)
assert firewall.add_to_wan(u, "vnet2") == 'wan1f'
assert firewall.add_device_to_wan(u, "vnet2") == 'wan1f'
assert 'vnet2' in u.get_all('firewall', 'wan1f', 'device')

def test_add_service(tmp_path):
Expand Down
90 changes: 90 additions & 0 deletions tests/test_ipsec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import pathlib

import pytest
from euci import EUci
from pytest_mock import MockFixture
from nethsec import ipsec, utils

templates_db = """
config template_rule 'ns_ipsec_esp'
option name 'Allow-IPSec-ESP'
option src 'wan'
option proto 'esp'
option target 'ACCEPT'

config template_rule 'ns_ipsec_ike'
option name 'Allow-IPSec-IKE'
option src 'wan'
option dest_port '500'
option proto 'udp'
option target 'ACCEPT'

config template_rule 'ns_ipsec_nat'
option name 'Allow-IPSec-NAT'
option src 'wan'
option dest_port '500'
option proto 'udp'
option target 'ACCEPT'
"""

@pytest.fixture
def e_uci(tmp_path: pathlib.Path) -> EUci:
conf_dir = tmp_path.joinpath('conf')
conf_dir.mkdir()
save_dir = tmp_path.joinpath('save')
save_dir.mkdir()
return EUci(confdir=conf_dir.as_posix(), savedir=save_dir.as_posix())


@pytest.fixture
def e_uci_with_data(e_uci: EUci):
with pathlib.Path(e_uci.confdir()).joinpath('templates').open('w') as fp:
fp.write(templates_db)
with pathlib.Path(e_uci.confdir()).joinpath('firewall').open('a') as fp:
pass
return e_uci

def test_init_ipsec(e_uci):
ipsec.init_ipsec(e_uci)
assert(e_uci.get('ipsec', 'ns_ipsec_global') == 'ipsec')
assert(e_uci.get('ipsec', 'ns_ipsec_global', 'debug') == '0')
assert(e_uci.get('ipsec', 'ns_ipsec_global', 'zone') == ipsec.IPSEC_ZONE)
assert(e_uci.get_all('ipsec', 'ns_ipsec_global', 'interface') == ('wan',))

def test_open_firewall_ports(e_uci_with_data):
ipsec.open_firewall_ports(e_uci_with_data)
nat = ike = esp = False
for r in utils.get_all_by_type(e_uci_with_data, 'firewall', 'rule'):
name = e_uci_with_data.get('firewall', r, 'name')
if name == 'Allow-IPSec-NAT':
nat = True
elif name == 'Allow-IPSec-IKE':
ike = True
elif name == 'Allow-IPSec-ESP':
esp = True
assert (nat and ipsec and esp)

def test_add_trusted_interface(e_uci_with_data):
ipsec.add_trusted_interface(e_uci_with_data, 'ipsec1')
count = 0
zid = ''
# check the zone has been created
for section in e_uci_with_data.get_all('firewall'):
if e_uci_with_data.get('firewall', section) == 'zone':
if e_uci_with_data.get('firewall', section, 'name') == ipsec.IPSEC_ZONE:
count = count + 1
zid = section
assert(count == 1)
assert(zid)
assert(e_uci_with_data.get_all('firewall', zid, 'network') == ('ipsec1',))
# check the zone has not been duplicated
count = 0
ipsec.add_trusted_interface(e_uci_with_data, 'ipsec2')
for section in e_uci_with_data.get_all('firewall'):
if e_uci_with_data.get('firewall', section) == 'zone':
if e_uci_with_data.get('firewall', section, 'name') == ipsec.IPSEC_ZONE:
count = count + 1
assert(count == 1)
assert('ipsec1' in e_uci_with_data.get_all('firewall', zid, 'network'))
assert('ipsec2' in e_uci_with_data.get_all('firewall', zid, 'network'))

Loading