Skip to content

Commit

Permalink
add smart remediation system tests
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Bertschy <[email protected]>
  • Loading branch information
matthyx committed Mar 25, 2024
1 parent 514db0d commit 1934417
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 5 deletions.
18 changes: 18 additions & 0 deletions configurations/k8s_workloads/smart-remediation/c0016-fixed.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
spec:
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx
securityContext:
allowPrivilegeEscalation: false
18 changes: 18 additions & 0 deletions configurations/k8s_workloads/smart-remediation/c0017-fixed.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
spec:
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx
securityContext:
readOnlyRootFilesystem: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
spec:
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx
4 changes: 4 additions & 0 deletions configurations/system/tests.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from configurations.system.tests_cases.network_policy_tests import NetworkPolicyTests
from configurations.system.tests_cases.smart_remediation_tests import SmartRemediationTests
from configurations.system.tests_cases.synchronizer_tests import SynchronizerTests
from systest_utils import TestUtil

Expand All @@ -22,6 +23,7 @@ def all_tests_names():
tests.extend(TestUtil.get_class_methods(RelevantVulnerabilityScanningTests))
tests.extend(TestUtil.get_class_methods(NetworkPolicyTests))
tests.extend(TestUtil.get_class_methods(NotificationSTests))
tests.extend(TestUtil.get_class_methods(SmartRemediationTests))
tests.extend(TestUtil.get_class_methods(SynchronizerTests))
return tests

Expand All @@ -44,6 +46,8 @@ def get_test(test_name):
return NetworkPolicyTests().__getattribute__(test_name)()
if test_name in TestUtil.get_class_methods(NotificationSTests):
return NotificationSTests().__getattribute__(test_name)()
if test_name in TestUtil.get_class_methods(SmartRemediationTests):
return SmartRemediationTests().__getattribute__(test_name)()
if test_name in TestUtil.get_class_methods(SynchronizerTests):
return SynchronizerTests().__getattribute__(test_name)()

Expand Down
31 changes: 31 additions & 0 deletions configurations/system/tests_cases/smart_remediation_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import inspect

from .structures import TestConfiguration
from systest_utils import statics


class SmartRemediationTests(object):

# C-0016 - Allow privilege escalation
@staticmethod
def smart_remediation_c0016():
from tests_scripts.helm.smart_remediation import SmartRemediation
from os.path import join
return TestConfiguration(
name=inspect.currentframe().f_code.co_name,
control="C-0016",
workload=join(statics.DEFAULT_SMART_REMEDIATION_PATH, "nginx-deployment.yaml"),
workload_fix=join(statics.DEFAULT_SMART_REMEDIATION_PATH, "c0016-fixed.yaml"),
test_obj=SmartRemediation)

# C-0017 - Immutable container filesystem
@staticmethod
def smart_remediation_c0017():
from tests_scripts.helm.smart_remediation import SmartRemediation
from os.path import join
return TestConfiguration(
name=inspect.currentframe().f_code.co_name,
control="C-0017",
workload=join(statics.DEFAULT_SMART_REMEDIATION_PATH, "nginx-deployment.yaml"),
workload_fix=join(statics.DEFAULT_SMART_REMEDIATION_PATH, "c0017-fixed.yaml"),
test_obj=SmartRemediation)
17 changes: 14 additions & 3 deletions infrastructure/backend_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2152,9 +2152,20 @@ def get_vuln_v2_images(self, body: dict, expected_results: int = 0):

def get_vuln_v2_components(self, body: dict, expected_results: int = 0):
return self.post_list_request(API_VULNERABILITY_V2_COMPONENT, body, expected_results)




def get_posture_resources_highlights(self, body: dict):
r = self.post(API_POSTURE_RESOURCES + '/highlights',
params={"smEnabled": "true", "customerGUID": self.customer_guid},
json=body)
if not 200 <= r.status_code < 300:
raise Exception(
'Error accessing smart remediation. Request: results of posture resources highlights "%s" (code: %d, message: %s)' % (
self.customer, r.status_code, r.text))
j = r.json()
if not j:
raise Exception('Request: results of posture resources highlights is empty body: %s' % body)
return j

class Solution(object):
"""docstring for Solution"""

Expand Down
32 changes: 32 additions & 0 deletions system_test_mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,38 @@
"description": "",
"skip_on_environment": ""
},
"smart_remediation_c0016": {
"target": [
"In cluster",
"Backend"
],
"target_repositories": [
"helm-chart",
"node-agent",
"storage",
"cadashboardbe",
"event-ingester-service",
"synchronizer"
],
"description": "Checks smart remediation C0016 flow",
"skip_on_environment": ""
},
"smart_remediation_c0017": {
"target": [
"In cluster",
"Backend"
],
"target_repositories": [
"helm-chart",
"node-agent",
"storage",
"cadashboardbe",
"event-ingester-service",
"synchronizer"
],
"description": "Checks smart remediation C0017 flow",
"skip_on_environment": ""
},
"synchronizer": {
"target": [
"In cluster",
Expand Down
3 changes: 3 additions & 0 deletions systest_utils/statics.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
DEFAULT_NAMESPACE_PATH = os.path.join(DEFAULT_K8S_PATHS, 'namespaces')
DEFAULT_CONFIGMAP_PATH = os.path.join(DEFAULT_K8S_PATHS, 'config-map')

# smart remediation tests
DEFAULT_SMART_REMEDIATION_PATH = os.path.join(DEFAULT_K8S_PATHS, 'smart-remediation')

# synchronizer tests
DEFAULT_SYNCHRONIZER_PATH = os.path.join(DEFAULT_K8S_PATHS, 'synchronizer')
DEFAULT_SYNCHRONIZER_CRDS_PATH = os.path.abspath(os.path.join('configurations', 'kubescape-crds', 'supported'))
Expand Down
122 changes: 122 additions & 0 deletions tests_scripts/helm/smart_remediation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
from .base_helm import BaseHelm
from ..kubescape.base_kubescape import BaseKubescape
from systest_utils import statics, Logger, TestUtil


class SmartRemediation(BaseKubescape, BaseHelm):
def __init__(
self, test_obj=None, backend=None, kubernetes_obj=None, test_driver=None
):
super(SmartRemediation, self).__init__(
test_driver=test_driver,
test_obj=test_obj,
backend=backend,
kubernetes_obj=kubernetes_obj,
)

self.helm_kwargs = {
"capabilities.relevancy": "enable",
"capabilities.configurationScan": "enable",
"capabilities.continuousScan": "disable",
"capabilities.nodeScan": "disable",
"capabilities.vulnerabilityScan": "disable",
"capabilities.runtimeObservability": "enable",
"grypeOfflineDB.enabled": "false",
}

test_helm_kwargs = self.test_obj.get_arg("helm_kwargs")
if test_helm_kwargs:
self.helm_kwargs.update(test_helm_kwargs)

def cleanup(self, **kwargs):
super().cleanup(**kwargs)
return statics.SUCCESS, ""

def check_smart_remediation(self, body, want=True, retries=0):
for _ in range(retries):
hl = self.backend.get_posture_resources_highlights(body)
if len(hl["response"]) > 0 and (want == ("smartRemediations" in hl["response"][0])):
return True
TestUtil.sleep(10, "wait for smart remediation")
return False

def start(self):
"""
Test plan:
1. Install Helm chart
2. Apply workload
...
"""
assert (
self.backend is not None
), f"the test {self.test_driver.test_name} must run with backend"

cluster, namespace = self.setup(apply_services=False)
print("Debug: cluster: ", cluster)

Logger.logger.info(f"1. Install Helm Chart")
self.add_and_upgrade_armo_to_repo()
self.install_armo_helm_chart(helm_kwargs=self.helm_kwargs)
self.verify_running_pods(
namespace=statics.CA_NAMESPACE_FROM_HELM_NAME, timeout=360
)

Logger.logger.info(f"2. Apply workload")
workload = self.apply_yaml_file(
yaml_file=self.test_obj["workload"], namespace=namespace
)
self.verify_all_pods_are_running(
namespace=namespace, workload=workload, timeout=180
)

Logger.logger.info(f"3. Trigger a scan")
self.backend.trigger_posture_scan(
cluster_name=cluster,
framework_list=["AllControls"],
with_host_sensor="false",
)

Logger.logger.info(f"3.1. Get report guid")
report_guid = self.get_report_guid(
cluster_name=cluster, wait_to_result=True, framework_name="AllControls"
)
assert report_guid != "", "report guid is empty"

Logger.logger.info(f"4. Check smart remediation is available")
body = {"pageNum": 1, "pageSize": 1, "cursor": "", "orderBy": "", "innerFilters": [{
"resourceID": "apps/v1/" + namespace + "/Deployment/" + workload["metadata"]["name"],
"controlID": self.test_obj["control"],
"reportGUID": report_guid,
"frameworkName": "AllControls"
}]}
assert self.check_smart_remediation(body, retries=15), "smartRemediations is not found"

Logger.logger.info(f"5. Correct the issue")
workload_fix = self.apply_yaml_file(
yaml_file=self.test_obj["workload_fix"], namespace=namespace, patch=True
)
self.verify_all_pods_are_running(namespace=namespace, workload=workload_fix, timeout=180)

Logger.logger.info(f"6. Trigger another scan")
self.backend.trigger_posture_scan(
cluster_name=cluster,
framework_list=["AllControls"],
with_host_sensor="false",
)

Logger.logger.info(f"6.1. Get report guid")
report_guid = self.get_report_guid(
cluster_name=cluster, wait_to_result=True, framework_name="AllControls"
)
assert report_guid != "", "report guid is empty"

Logger.logger.info(f"7. Check the issue is resolved")
body = {"pageNum": 1, "pageSize": 1, "cursor": "", "orderBy": "", "innerFilters": [{
"resourceID": "apps/v1/" + namespace + "/Deployment/" + workload["metadata"]["name"],
"controlID": self.test_obj["control"],
"reportGUID": report_guid,
"frameworkName": "AllControls"
}]}
assert self.check_smart_remediation(body, want=False, retries=15), "smartRemediations should be empty"

return self.cleanup()
7 changes: 5 additions & 2 deletions tests_scripts/kubernetes/base_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def apply_directory(self, path: str, **kwargs):

def apply_yaml_file(self, yaml_file, namespace: str, path: str = statics.DEFAULT_DEPLOYMENT_PATH,
unique_name: bool = False, name: str = None, wlid: str = None, auto_attach: bool = False,
auto_protect: bool = False, **kwargs):
auto_protect: bool = False, patch: bool = False, **kwargs):
"""
currently supports yaml with *one* workload
for applying more than one workload the yaml_file should be a list of files
Expand Down Expand Up @@ -349,7 +349,10 @@ def apply_yaml_file(self, yaml_file, namespace: str, path: str = statics.DEFAULT
statics.AUTO_ATTACH_SECRET_VALUE})

self.update_workload_metadata_env(workload=workload, env=self.env)
self.apply_workload(workload=workload, namespace=namespace, **kwargs)
if patch:
self.kubernetes_obj.patch_workload(application=workload, namespace=namespace, kind=workload["kind"], name=workload["metadata"]["name"])
else:
self.apply_workload(workload=workload, namespace=namespace, **kwargs)
return workload

def apply_workload(self, workload, namespace: str, wait: int = 0):
Expand Down

0 comments on commit 1934417

Please sign in to comment.