diff --git a/build-scripts/profile_tool.py b/build-scripts/profile_tool.py index 5ca46bd5618..4825cdb5ded 100755 --- a/build-scripts/profile_tool.py +++ b/build-scripts/profile_tool.py @@ -5,7 +5,7 @@ import argparse try: - from utils.profile_tool import command_stats, command_sub + from utils.profile_tool import command_stats, command_sub, command_most_used_rules except ImportError: print("The ssg module could not be found.") print( @@ -250,11 +250,39 @@ def parse_sub_subcommand(subparsers): ) +def parse_most_used_rules_subcommand(subparsers): + parser_most_used_rules = subparsers.add_parser( + "most-used-rules", + description=( + "Generates list of all rules used by the existing profiles. In various formats." + ), + help="Generates list of all rules used by the existing profiles.", + ) + parser_most_used_rules.add_argument( + "BENCHMARKS", + type=str, + nargs="*", + default=[], + help=( + "Specify XCCDF files or a SCAP source data stream files to act on. " + "If not provided are used control files. e.g.: ~/scap-security-guide/controls" + ), + ) + parser_most_used_rules.add_argument( + "--format", + default="plain", + choices=["plain", "json", "csv"], + help="Which format to use for output.", + ) + + def parse_args(): parser = argparse.ArgumentParser(description="Profile statistics and utilities tool") subparsers = parser.add_subparsers(title="subcommands", dest="subcommand", required=True) + parse_stats_subcommand(subparsers) parse_sub_subcommand(subparsers) + parse_most_used_rules_subcommand(subparsers) args = parser.parse_args() @@ -287,7 +315,11 @@ def parse_args(): return args -SUBCMDS = dict(stats=command_stats, sub=command_sub) +SUBCMDS = { + "stats": command_stats, + "sub": command_sub, + "most-used-rules": command_most_used_rules, +} def main(): diff --git a/docs/manual/developer/05_tools_and_utilities.md b/docs/manual/developer/05_tools_and_utilities.md index 8026299b44a..8ec39032574 100644 --- a/docs/manual/developer/05_tools_and_utilities.md +++ b/docs/manual/developer/05_tools_and_utilities.md @@ -38,11 +38,29 @@ rules selected by another profile, run this command: ```bash $ ./build-scripts/profile_tool.py sub --profile1 rhel7/profiles/ospp.profile --profile2 rhel7/profiles/pci-dss.profile -```` +``` This will result in a new YAML profile containing exclusive rules to the profile pointed by the `--profile1` option. +The tool can also generate a list of the most used rules contained in profiles from a given data stream or benchmark. + +For example, to get a list of the most used rules in the benchmark for `rhel8`, run this command: + +```bash + $ ./build-scripts/profile_tool.py most-used-rules build/ssg-rhel8-xccdf.xml +``` + +Or you can also run this command to get a list of the most used rules in the entire project: + +```bash + $ ./build-scripts/profile_tool.py most-used-rules +``` + +The result will be a list of rules with the number of uses in the profiles. +The list can be generated as plain text, JSON or CVS. +Via the `--format FORMAT` parameter. + ## Generating Controls from DISA's XCCDF Files If you want a control file for product from DISA's XCCDF files you can run the following command: diff --git a/ssg/build_profile.py b/ssg/build_profile.py index aee33e21940..48de853691b 100644 --- a/ssg/build_profile.py +++ b/ssg/build_profile.py @@ -807,15 +807,21 @@ def show_profile_stats(self, profile, options): return profile_stats - def show_all_profile_stats(self, options): + def _process_all_profile_stats(self, function_to_process_profile, *args): all_profile_elems = self.tree.findall("./{%s}Profile" % (XCCDF12_NS)) ret = [] for elem in all_profile_elems: profile = elem.get('id') if profile is not None: - ret.append(self.show_profile_stats(profile, options)) + ret.append(function_to_process_profile(profile, *args)) return ret + def show_all_profile_stats(self, options): + return self._process_all_profile_stats(self.show_profile_stats, options) + + def get_all_profile_stats(self): + return self._process_all_profile_stats(self.get_profile_stats) + def console_print(self, content, width): """Prints the 'content' array left aligned, each time 45 characters long, each row 'width' characters wide""" diff --git a/tests/unit/utils/test_generate_most_used_rules.py b/tests/unit/utils/test_generate_most_used_rules.py new file mode 100644 index 00000000000..d213fd6c0ea --- /dev/null +++ b/tests/unit/utils/test_generate_most_used_rules.py @@ -0,0 +1,23 @@ +import os +import sys +import pytest +from argparse import Namespace +from utils.profile_tool import command_most_used_rules + +DATA_DIR = os.path.abspath( + os.path.join(os.path.dirname(__file__), "..", "ssg-module", "data") +) +DATA_STREAM_PATH = os.path.join(DATA_DIR, "simple_data_stream.xml") + + +def get_fake_args(): + return Namespace( + subcommand="most-used-rules", BENCHMARKS=[str(DATA_STREAM_PATH)], format="plain" + ) + + +@pytest.mark.skipif(sys.version_info[0] < 3, reason="requires python3") +def test_command(capsys): + command_most_used_rules(get_fake_args()) + captured = capsys.readouterr() + assert "xccdf_com.example.www_rule_test-pass: 1" in captured.out diff --git a/utils/profile_tool/__init__.py b/utils/profile_tool/__init__.py index 016d023e6b1..2b2f56c25ee 100644 --- a/utils/profile_tool/__init__.py +++ b/utils/profile_tool/__init__.py @@ -1,2 +1,3 @@ from .sub import command_sub from .stats import command_stats +from .most_used_rules import command_most_used_rules diff --git a/utils/profile_tool/most_used_rules.py b/utils/profile_tool/most_used_rules.py new file mode 100644 index 00000000000..c8072ed3bb1 --- /dev/null +++ b/utils/profile_tool/most_used_rules.py @@ -0,0 +1,80 @@ +import sys +import json + +from ssg.build_profile import XCCDFBenchmark + + +PYTHON_2 = sys.version_info[0] < 3 + +if not PYTHON_2: + from .profile import get_profile + from ..controleval import ( + load_controls_manager, + get_available_products, + get_product_profiles_files, + ) + + +def _count_rules_per_rules_list(rules_list, rules): + for rule in rules_list: + if rule in rules: + rules[rule] += 1 + else: + rules[rule] = 1 + + +def _count_rules_per_benchmark(benchmark, rules): + benchmark = XCCDFBenchmark(benchmark) + for profile in benchmark.get_all_profile_stats(): + _count_rules_per_rules_list(profile.get("rules", []), rules) + + +def _get_profiles_for_product(ctrls_mgr, product): + profiles_files = get_product_profiles_files(product) + + profiles = [] + for file in profiles_files: + profiles.append(get_profile(profiles_files, file, ctrls_mgr.policies)) + return profiles + + +def _process_all_products_from_controls(rules): + if PYTHON_2: + raise Exception("This feature is not supported for python2.") + + for product in get_available_products(): + controls_manager = load_controls_manager("./controls/", product) + for profile in _get_profiles_for_product(controls_manager, product): + _count_rules_per_rules_list(profile.rules, rules) + + +def _sorted_rules(rules): + sorted_rules = { + k: v + for k, v in sorted(rules.items(), key=lambda x: x[1], reverse=True) + } + return sorted_rules + + +def command_most_used_rules(args): + rules = {} + + if not args.BENCHMARKS: + _process_all_products_from_controls(rules) + else: + for benchmark in args.BENCHMARKS: + _count_rules_per_benchmark(benchmark, rules) + + sorted_rules = _sorted_rules(rules) + + f_string = "{}: {}" + + if args.format == "json": + print(json.dumps(sorted_rules, indent=4)) + return + elif args.format == "csv": + print("rule_id,count_of_profiles") + f_string = "{},{}" + + for rule_id, rule_count in sorted_rules.items(): + print(f_string.format(rule_id, rule_count)) diff --git a/utils/profile_tool/profile.py b/utils/profile_tool/profile.py new file mode 100644 index 00000000000..e73b1145773 --- /dev/null +++ b/utils/profile_tool/profile.py @@ -0,0 +1,100 @@ +from ..controleval import get_parameter_from_yaml + + +def _get_extends_profile_path(profiles_files, profile_name): + for profile_path in profiles_files: + if f"{profile_name}.profile" in profile_path: + return profile_path + return None + + +def _process_extends(profiles_files, file, policies, profile): + extends = get_parameter_from_yaml(file, "extends") + if isinstance(extends, str): + profile_path = _get_extends_profile_path(profiles_files, extends) + if profile_path is None: + raise Exception("There is no Extension '{}' Profile.".format(extends)) + profile = get_profile(profiles_files, profile_path, policies, profile) + + +def _process_selections(file, profile, policies): + selections = get_parameter_from_yaml(file, "selections") + for selected in selections: + if ":" in selected and "=" not in selected: + profile.add_from_policy(policies, selected) + else: + profile.add_rule(selected) + profile.clean_rules() + + +def get_profile(profiles_files, file, policies, profile=None): + if profile is None: + title = get_parameter_from_yaml(file, "title") + profile = Profile(file, title) + + _process_extends(profiles_files, file, policies, profile) + + _process_selections(file, profile, policies) + return profile + + +class Profile: + def __init__(self, path, title): + self.path = path + self.title = title + self.rules = [] + self.unselected_rules = [] + + def add_rule(self, rule_id): + if rule_id.startswith("!"): + self.unselected_rules.append(rule_id) + return + if "=" not in rule_id: + self.rules.append(rule_id) + + def add_rules(self, rules): + for rule in rules: + self.add_rule(rule) + + def clean_rules(self): + for rule in self.unselected_rules: + rule_ = rule.replace("!", "") + if rule_ in self.rules: + self.rules.remove(rule_) + + @staticmethod + def _get_sel(selected): + policy = None + control = None + level = None + if selected.count(":") == 2: + policy, control, level = selected.split(":") + else: + policy, control = selected.split(":") + return policy, control, level + + @staticmethod + def _get_levels(policy, level): + levels = [level] + if policy.levels_by_id.get(level).inherits_from is not None: + levels.extend(policy.levels_by_id.get(level).inherits_from) + return levels + + def add_from_policy(self, policies, selected): + policy_id, control, level = self._get_sel(selected) + policy = policies[policy_id] + + if control != "all": + self.add_rules(policy.controls_by_id[control].rules) + return + + if level is None: + for control in policy.controls: + self.add_rules(control.rules) + return + + levels = self._get_levels(policy, level) + for control in policy.controls: + intersection = set(control.levels) & set(levels) + if len(intersection) >= 1: + self.add_rules(control.rules)