Skip to content

Commit

Permalink
Example tests for SPCI hackathon
Browse files Browse the repository at this point in the history
  • Loading branch information
victorusu committed Nov 30, 2023
1 parent 20a1a5e commit 9c0fcdf
Show file tree
Hide file tree
Showing 8 changed files with 1,055 additions and 0 deletions.
77 changes: 77 additions & 0 deletions hpctestlib/anssi_high/aide.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright 2016-2023 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: BSD-3-Clause

import reframe as rfm
import reframe.utility.sanity as sn


class SudoCmd(rfm.RegressionMixin):
@run_before('run')
def set_sudo_cmd(self):
self.executable = f'sudo {self.executable}'


@rfm.simple_test
class aide_database_check(rfm.RunOnlyRegressionTest, SudoCmd):
'''Check if aide is installed and is minimally configured'''

executable = 'aide'
executable_opts = ['--check']
tags = {'system', 'anssi', 'aide'}

@run_before('run')
def set_sudo_cmd(self):
self.executable = f'{self.executable}'

@sanity_function
def assert_checks(self):

return sn.all([
sn.assert_not_found('command not found', self.stderr),
sn.assert_not_found('Permission denied', self.stderr),
sn.assert_not_found(r'Couldn\'t open file .* for reading', self.stdout),
sn.assert_found('Summary', self.stdout),
sn.assert_found('aide.db.gz', self.stdout),
sn.assert_found('End timestamp', self.stdout)]
)


@rfm.simple_test
class aide_configured_check(rfm.RunOnlyRegressionTest, SudoCmd):
'''Check if aide was configured with proper rule options'''

#: Parameter listing the aide rules that must have
#: the aide_rule_opts configured
#:
#: :type: :class:`str`
#: :values: ``['NORMAL', 'DIR', 'PERMS', 'LOG',
# 'CONTENT_EX', 'DATAONLY']``
aide_rules = parameter(['NORMAL', 'DIR', 'PERMS', 'LOG',
'CONTENT_EX', 'DATAONLY'])

#: Parameter listing the aide rules options that must have
#: be configured to the
#:
#: :type: :class:`str`
#: :values: ``['NORMAL', 'DIR', 'PERMS', 'LOG',
# 'CONTENT_EX', 'DATAONLY']``
aide_rules_opts = parameter(['ACL', 'SELinux', 'XATTRS'])

executable = 'cat'
executable_opts = ['/etc/aide.conf']
tags = {'system', 'anssi', 'aide'}

@sanity_function
def assert_checks(self):

return sn.all([
sn.assert_not_found('command not found', self.stderr),
sn.assert_not_found('Permission denied', self.stderr),
sn.assert_not_found(r'Couldn\'t open file .* for reading',
self.stdout),
sn.assert_found(rf'^{self.aide_rules}\s?=\s?.*'
rf'{self.aide_rules_opts.lower()}.*', self.stdout)]
)

182 changes: 182 additions & 0 deletions hpctestlib/anssi_high/audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# Copyright 2016-2023 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: BSD-3-Clause

import os

import reframe as rfm
import reframe.utility.sanity as sn
import reframe.utility.typecheck as typ

from reframe.core.exceptions import SanityError


class SkipIfNotRoot(rfm.RegressionMixin):
@run_after('init')
def skip_if_not_root(self):
self.skip_if(os.getuid() != 0,
msg='Skipping test because it has be executed as root')


class SkipIfNotLocal(rfm.RegressionMixin):
@run_before('run')
def skip_if_not_local(self):
self.skip_if(not self.is_local,
msg="Skipping the test because it is not local")


@rfm.simple_test
class audit_rules_persistency_check(rfm.RunOnlyRegressionTest,
SkipIfNotRoot, SkipIfNotLocal):
'''Check if the audit rules cannot be changed'''

executable = 'echo'
executable_opts = ['done']
audit_rules_file = '/etc/audit/rules.d/audit.rules'
tags = {'system', 'anssi', 'audit'}

@sanity_function
def assert_checks(self):
value = sn.extractall(r'^-e[\s]+(?P<value>\d+)', self.audit_rules_file,
'value', int)

return sn.all([
sn.assert_not_found('command not found', self.stderr),
sn.assert_not_found('Permission denied', self.stderr),
sn.assert_true(value, msg='-e 2 not found in the '
f'{self.audit_rules_file} file'),
sn.assert_eq(sn.count(value), 1,
msg=f'There are multiple entries to -e flag'),
sn.assert_eq(value[0], 2,
msg=f'The -e flag should be set to 2'),
])


@rfm.simple_test
class audit_immutable_rules_check(audit_rules_persistency_check):
audit_rules_file = '/etc/audit/rules.d/immutable.rules'


@rfm.simple_test
class audit_monitor_path_rules_check(rfm.RunOnlyRegressionTest,
SkipIfNotRoot, SkipIfNotLocal):
'''Check if the audit rules cannot be changed'''

#: Audit path rules to be monitored
#:
#: :type: `Dict[str, str]`. The key should be the path to a file or folder.
#: and the value should be a string with the audit rule options.
#: E.g., '{"/etc/selinux": "key=MAC-policy,perm=rwa"}'
audit_path_rules = variable(typ.Dict[str,str])

executable = 'echo'
executable_opts = ['done']
audit_rules_file = '/etc/audit/rules.d'
tags = {'system', 'anssi', 'audit'}

def get_all_files(self):
files = []
for (dirpath, _, filenames) in os.walk(self.audit_rules_file):
for f in filenames:
files.append(os.path.join(dirpath, f))

return files

def explode_rules_str(self, opts_str):
result = {}
for opt in opts_str.split(','):
if opt == '':
continue

opt_parts = opt.split('=', maxsplit=2)
keystr = opt_parts[0]
valstr = opt_parts[1] if len(opt_parts) > 1 else ''
result[keystr] = valstr

return result

def is_key_ne_item_dict(self, key, item, dictionary):
if key and key in dictionary:
if item.group(key) == dictionary[key]:
return False
return True

def incr_counter(self, key, dictionary):
if key:
if key in dictionary:
dictionary[key] += 1
else:
dictionary[key] = 1

def process_rules(self, found_rules, error_msgs, monitored_files,
tobe_monitored_files):
for found_rule in found_rules:
self.incr_counter(found_rule.group('path'), monitored_files)

for audit_item, audit_rules in self.audit_path_rules.items():
rules = self.explode_rules_str(audit_rules)

if audit_item == found_rule.group('path'):
self.incr_counter(found_rule.group('path'),
tobe_monitored_files)
if self.is_key_ne_item_dict('key', found_rule, rules):
error_msgs.add('audit keyname not correct for path '
f'{audit_item}')
if self.is_key_ne_item_dict('perm', found_rule, rules):
error_msgs.add('audit permission not correct for path '
f'{audit_item}')


def evaluate_sanity(self, error_msgs, monitored_files, tobe_monitored_files):
result = []
nl = '\n'

all_sanities = sn.chain([
sn.assert_not_found('command not found', self.stderr),
sn.assert_not_found('Permission denied', self.stderr),
sn.assert_ne(sn.count(error_msgs), 0,
msg=f'{self.audit_path_rules} is not being monitored '
'via audit'),
sn.assert_eq(sn.count(error_msgs), 0,
msg=f'{nl.join(error_msgs)}')],
sn.map(lambda x: sn.assert_in(x, tobe_monitored_files,
msg=f'File {x} is not being '
'monitored'),
self.audit_path_rules),
sn.map(lambda x: sn.assert_eq(x[1], 1,
msg=f'File {x[0]} is monitored {x[1]}'
' times'),
monitored_files.items()),
)

for sanity in all_sanities:
try:
sn.evaluate(sanity)
except SanityError as e:
result.append(str(e))

return result


@sanity_function
def assert_checks(self):
error_msgs = set()
monitored_files = {}
tobe_monitored_files = {}
files = self.get_all_files()

for file in files:
found_rules = sn.evaluate(
sn.findall(
r'^(?!#)'
r'(?=.*(?:-F key=|-k\s+)(?P<key>[\S+]*))?'
r'(?=.*(?:-F perm=|-p\s+)(?P<perm>[\S+]*))?'
r'(?=.*(?:-F dir=|-F path=|-w\s+)(?P<path>[\S+]*))?'
r'.*$',file))

self.process_rules(found_rules, error_msgs,
monitored_files, tobe_monitored_files)

result = self.evaluate_sanity(error_msgs, monitored_files, tobe_monitored_files)
return sn.assert_false(result, msg='\n'+'\n'.join(result))
71 changes: 71 additions & 0 deletions hpctestlib/anssi_high/dnf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright 2016-2023 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: BSD-3-Clause

import os

import reframe as rfm
import reframe.utility.sanity as sn


class SkipIfNotRoot(rfm.RegressionMixin):
@run_after('init')
def skip_if_not_root(self):
self.skip_if(os.getuid() != 0,
msg='Skipping test because it has be executed as root')


@rfm.simple_test
class packages_updated_check(rfm.RunOnlyRegressionTest, SkipIfNotRoot):
'''Ensure there are no security updates'''

executable = 'dnf'
executable_opts = ['update', '--security', '<<<N']
tags = {'system', 'anssi', 'packages', 'dnf'}

@sanity_function
def assert_checks(self):
return sn.all([
sn.assert_not_found('command not found', self.stderr),
sn.assert_not_found('Permission denied', self.stderr),
sn.assert_found('Dependencies resolved.', self.stdout),
sn.assert_found('Nothing to do.', self.stdout),
sn.assert_found('Complete!', self.stdout),
])


@rfm.simple_test
class needs_reboot_check(rfm.RunOnlyRegressionTest, SkipIfNotRoot):
'''Check if the system needs rebooting'''

executable = 'needs-restarting'
executable_opts = ['-r']
tags = {'system', 'anssi', 'packages', 'dnf'}

@sanity_function
def assert_checks(self):
return sn.all([
sn.assert_not_found('command not found', self.stderr),
sn.assert_not_found('Permission denied', self.stderr),
sn.assert_found('Reboot should not be necessary.', self.stdout,
msg='System requires a reboot after update'),
])


@rfm.simple_test
class gpg_enabled_check(rfm.RunOnlyRegressionTest, SkipIfNotRoot):
'''Check if all repos have GPG enabled'''

executable = 'cat'
executable_opts = ['/etc/yum.conf', '/etc/yum.repos.d/*']
tags = {'system', 'anssi', 'packages', 'dnf'}

@sanity_function
def assert_checks(self):
return sn.all([
sn.assert_not_found('command not found', self.stderr),
sn.assert_not_found('Permission denied', self.stderr),
sn.assert_not_found(r'^gpgcheck[\s]*=[\s]*0', self.stdout,
msg='Found repo with gpgpcheck=0'),
])
62 changes: 62 additions & 0 deletions hpctestlib/anssi_high/ebpf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright 2016-2023 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: BSD-3-Clause

import json
import os

import reframe as rfm
import reframe.utility.sanity as sn
import reframe.utility.typecheck as typ

from reframe.core.exceptions import SanityError


class SkipIfNotRoot(rfm.RegressionMixin):
@run_after('init')
def skip_if_not_root(self):
self.skip_if(os.getuid() != 0,
msg='Skipping test because it has be executed as root')


@rfm.simple_test
class ebpf_prog_list_check(rfm.RunOnlyRegressionTest, SkipIfNotRoot):
'''Ensure ebpf is functional'''

#: eBPF program tags
#:
#: :type: `List[str]`. The list should contain all the whitelist of
#: ebpf program tags
ebpf_tags = variable(typ.List[str])

executable = 'bpftool'
executable_opts = ['prog', 'list', '--json', '--pretty']
tags = {'system', 'anssi', 'ebpf'}

@run_before('sanity')
def skip_if_command_not_found(self):
stderr = os.path.join(self.stagedir, sn.evaluate(self.stderr))
try:
sn.evaluate(sn.assert_not_found('command not found', stderr))
except SanityError as e:
self.skip('bpftool is not installed')

@run_before('sanity')
def read_output_json(self):
self.ebpf_json = None
stdout = os.path.join(self.stagedir, sn.evaluate(self.stdout))
with open(stdout, 'r') as fp:
self.ebpf_json = json.load(fp)

@sanity_function
def assert_checks(self):
return sn.all(sn.chain(
[sn.assert_not_found('Permission denied', self.stderr)],
sn.map(lambda x: sn.assert_in(x['tag'], self.ebpf_tags,
msg='found unknown ebpf program '
f'"{x["name"]}" with '
f'id "{x["id"]}" and '
f'tag "{x["tag"]}"'),
self.ebpf_json),
))
Loading

0 comments on commit 9c0fcdf

Please sign in to comment.