Skip to content

Commit

Permalink
Merge pull request #11439 from Honny1/most-used-rules
Browse files Browse the repository at this point in the history
Tool for identifying the most used rules
  • Loading branch information
marcusburghardt authored Feb 29, 2024
2 parents d766d59 + 18bfd53 commit cb599e6
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 5 deletions.
36 changes: 34 additions & 2 deletions build-scripts/profile_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import argparse

try:
from utils.profile_tool import command_stats, command_sub
from utils.profile_tool import command_stats, command_sub, command_most_used_rules
except ImportError:
print("The ssg module could not be found.")
print(
Expand Down Expand Up @@ -250,11 +250,39 @@ def parse_sub_subcommand(subparsers):
)


def parse_most_used_rules_subcommand(subparsers):
parser_most_used_rules = subparsers.add_parser(
"most-used-rules",
description=(
"Generates list of all rules used by the existing profiles. In various formats."
),
help="Generates list of all rules used by the existing profiles.",
)
parser_most_used_rules.add_argument(
"BENCHMARKS",
type=str,
nargs="*",
default=[],
help=(
"Specify XCCDF files or a SCAP source data stream files to act on. "
"If not provided are used control files. e.g.: ~/scap-security-guide/controls"
),
)
parser_most_used_rules.add_argument(
"--format",
default="plain",
choices=["plain", "json", "csv"],
help="Which format to use for output.",
)


def parse_args():
parser = argparse.ArgumentParser(description="Profile statistics and utilities tool")
subparsers = parser.add_subparsers(title="subcommands", dest="subcommand", required=True)

parse_stats_subcommand(subparsers)
parse_sub_subcommand(subparsers)
parse_most_used_rules_subcommand(subparsers)

args = parser.parse_args()

Expand Down Expand Up @@ -287,7 +315,11 @@ def parse_args():
return args


SUBCMDS = dict(stats=command_stats, sub=command_sub)
SUBCMDS = {
"stats": command_stats,
"sub": command_sub,
"most-used-rules": command_most_used_rules,
}


def main():
Expand Down
20 changes: 19 additions & 1 deletion docs/manual/developer/05_tools_and_utilities.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,29 @@ rules selected by another profile, run this command:

```bash
$ ./build-scripts/profile_tool.py sub --profile1 rhel7/profiles/ospp.profile --profile2 rhel7/profiles/pci-dss.profile
````
```

This will result in a new YAML profile containing exclusive rules to the
profile pointed by the `--profile1` option.

The tool can also generate a list of the most used rules contained in profiles from a given data stream or benchmark.

For example, to get a list of the most used rules in the benchmark for `rhel8`, run this command:

```bash
$ ./build-scripts/profile_tool.py most-used-rules build/ssg-rhel8-xccdf.xml
```

Or you can also run this command to get a list of the most used rules in the entire project:

```bash
$ ./build-scripts/profile_tool.py most-used-rules
```

The result will be a list of rules with the number of uses in the profiles.
The list can be generated as plain text, JSON or CVS.
Via the `--format FORMAT` parameter.

## Generating Controls from DISA's XCCDF Files

If you want a control file for product from DISA's XCCDF files you can run the following command:
Expand Down
10 changes: 8 additions & 2 deletions ssg/build_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -807,15 +807,21 @@ def show_profile_stats(self, profile, options):

return profile_stats

def show_all_profile_stats(self, options):
def _process_all_profile_stats(self, function_to_process_profile, *args):
all_profile_elems = self.tree.findall("./{%s}Profile" % (XCCDF12_NS))
ret = []
for elem in all_profile_elems:
profile = elem.get('id')
if profile is not None:
ret.append(self.show_profile_stats(profile, options))
ret.append(function_to_process_profile(profile, *args))
return ret

def show_all_profile_stats(self, options):
return self._process_all_profile_stats(self.show_profile_stats, options)

def get_all_profile_stats(self):
return self._process_all_profile_stats(self.get_profile_stats)

def console_print(self, content, width):
"""Prints the 'content' array left aligned, each time 45 characters
long, each row 'width' characters wide"""
Expand Down
23 changes: 23 additions & 0 deletions tests/unit/utils/test_generate_most_used_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import os
import sys
import pytest
from argparse import Namespace
from utils.profile_tool import command_most_used_rules

DATA_DIR = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "ssg-module", "data")
)
DATA_STREAM_PATH = os.path.join(DATA_DIR, "simple_data_stream.xml")


def get_fake_args():
return Namespace(
subcommand="most-used-rules", BENCHMARKS=[str(DATA_STREAM_PATH)], format="plain"
)


@pytest.mark.skipif(sys.version_info[0] < 3, reason="requires python3")
def test_command(capsys):
command_most_used_rules(get_fake_args())
captured = capsys.readouterr()
assert "xccdf_com.example.www_rule_test-pass: 1" in captured.out
1 change: 1 addition & 0 deletions utils/profile_tool/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .sub import command_sub
from .stats import command_stats
from .most_used_rules import command_most_used_rules
80 changes: 80 additions & 0 deletions utils/profile_tool/most_used_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import sys
import json

from ssg.build_profile import XCCDFBenchmark


PYTHON_2 = sys.version_info[0] < 3

if not PYTHON_2:
from .profile import get_profile
from ..controleval import (
load_controls_manager,
get_available_products,
get_product_profiles_files,
)


def _count_rules_per_rules_list(rules_list, rules):
for rule in rules_list:
if rule in rules:
rules[rule] += 1
else:
rules[rule] = 1


def _count_rules_per_benchmark(benchmark, rules):
benchmark = XCCDFBenchmark(benchmark)
for profile in benchmark.get_all_profile_stats():
_count_rules_per_rules_list(profile.get("rules", []), rules)


def _get_profiles_for_product(ctrls_mgr, product):
profiles_files = get_product_profiles_files(product)

profiles = []
for file in profiles_files:
profiles.append(get_profile(profiles_files, file, ctrls_mgr.policies))
return profiles


def _process_all_products_from_controls(rules):
if PYTHON_2:
raise Exception("This feature is not supported for python2.")

for product in get_available_products():
controls_manager = load_controls_manager("./controls/", product)
for profile in _get_profiles_for_product(controls_manager, product):
_count_rules_per_rules_list(profile.rules, rules)


def _sorted_rules(rules):
sorted_rules = {
k: v
for k, v in sorted(rules.items(), key=lambda x: x[1], reverse=True)
}
return sorted_rules


def command_most_used_rules(args):
rules = {}

if not args.BENCHMARKS:
_process_all_products_from_controls(rules)
else:
for benchmark in args.BENCHMARKS:
_count_rules_per_benchmark(benchmark, rules)

sorted_rules = _sorted_rules(rules)

f_string = "{}: {}"

if args.format == "json":
print(json.dumps(sorted_rules, indent=4))
return
elif args.format == "csv":
print("rule_id,count_of_profiles")
f_string = "{},{}"

for rule_id, rule_count in sorted_rules.items():
print(f_string.format(rule_id, rule_count))
100 changes: 100 additions & 0 deletions utils/profile_tool/profile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from ..controleval import get_parameter_from_yaml


def _get_extends_profile_path(profiles_files, profile_name):
for profile_path in profiles_files:
if f"{profile_name}.profile" in profile_path:
return profile_path
return None


def _process_extends(profiles_files, file, policies, profile):
extends = get_parameter_from_yaml(file, "extends")
if isinstance(extends, str):
profile_path = _get_extends_profile_path(profiles_files, extends)
if profile_path is None:
raise Exception("There is no Extension '{}' Profile.".format(extends))
profile = get_profile(profiles_files, profile_path, policies, profile)


def _process_selections(file, profile, policies):
selections = get_parameter_from_yaml(file, "selections")
for selected in selections:
if ":" in selected and "=" not in selected:
profile.add_from_policy(policies, selected)
else:
profile.add_rule(selected)
profile.clean_rules()


def get_profile(profiles_files, file, policies, profile=None):
if profile is None:
title = get_parameter_from_yaml(file, "title")
profile = Profile(file, title)

_process_extends(profiles_files, file, policies, profile)

_process_selections(file, profile, policies)
return profile


class Profile:
def __init__(self, path, title):
self.path = path
self.title = title
self.rules = []
self.unselected_rules = []

def add_rule(self, rule_id):
if rule_id.startswith("!"):
self.unselected_rules.append(rule_id)
return
if "=" not in rule_id:
self.rules.append(rule_id)

def add_rules(self, rules):
for rule in rules:
self.add_rule(rule)

def clean_rules(self):
for rule in self.unselected_rules:
rule_ = rule.replace("!", "")
if rule_ in self.rules:
self.rules.remove(rule_)

@staticmethod
def _get_sel(selected):
policy = None
control = None
level = None
if selected.count(":") == 2:
policy, control, level = selected.split(":")
else:
policy, control = selected.split(":")
return policy, control, level

@staticmethod
def _get_levels(policy, level):
levels = [level]
if policy.levels_by_id.get(level).inherits_from is not None:
levels.extend(policy.levels_by_id.get(level).inherits_from)
return levels

def add_from_policy(self, policies, selected):
policy_id, control, level = self._get_sel(selected)
policy = policies[policy_id]

if control != "all":
self.add_rules(policy.controls_by_id[control].rules)
return

if level is None:
for control in policy.controls:
self.add_rules(control.rules)
return

levels = self._get_levels(policy, level)
for control in policy.controls:
intersection = set(control.levels) & set(levels)
if len(intersection) >= 1:
self.add_rules(control.rules)

0 comments on commit cb599e6

Please sign in to comment.