Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add account ID extraction and cleanup functionality in CSPM tests #600

Merged
merged 1 commit into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion infrastructure/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import time
from urllib.parse import urlparse, parse_qs
from systest_utils import Logger
import re



class CloudFormationManager:
Expand Down Expand Up @@ -120,4 +122,18 @@ def delete_stack(self):

except ClientError as e:
Logger.logger.error(f"An error occurred while deleting the stack: {e}")
raise e
raise e




def extract_account_id(arn):
"""
Extracts the AWS account ID from an ARN string.

:param arn: The ARN string (e.g., "arn:aws:iam::12345678:role/armo-scan-role-cross-with_customer-12345678")
:return: The extracted account ID as a string or None if not found.
"""
match = re.search(r"arn:aws:iam::(\d+):", arn)
return match.group(1) if match else None

2 changes: 1 addition & 1 deletion system_test_mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,7 @@
"event-ingester-service"
],
"description": "Checks accounts cspm",
"skip_on_environment": "",
"skip_on_environment": "production-us",
"owner": "[email protected]"
},
"clusters": {
Expand Down
2 changes: 1 addition & 1 deletion tests_scripts/accounts/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def __init__(self, test_obj=None, backend=None, kubernetes_obj=None, test_driver
"capabilities.runtimeObservability": "disable",
"capabilities.networkPolicyService": "disable",
"capabilities.seccompProfileService": "disable",
"capabilities.nodeProfileService": "disable",
"capabilities.nodeProfileService": "enable",
"capabilities.vulnerabilityScan": "disable",
"grypeOfflineDB.enabled": "false",
"capabilities.relevancy": "disabled",
Expand Down
44 changes: 42 additions & 2 deletions tests_scripts/accounts/cspm.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class CSPM(Accounts):
def __init__(self, test_obj=None, backend=None, kubernetes_obj=None, test_driver=None):
super().__init__(test_driver=test_driver, test_obj=test_obj, backend=backend, kubernetes_obj=kubernetes_obj)

self.stack_manager = None




Expand All @@ -55,6 +57,7 @@ def start(self):
"""

assert self.backend is not None, f'the test {self.test_driver.test_name} must run with backend'

stack_region = "us-east-1"
# generate random number for cloud account name for uniqueness
rand = str(random.randint(10000000, 99999999))
Expand Down Expand Up @@ -84,6 +87,10 @@ def start(self):
Logger.logger.info('Stage 3: Create bad arn cloud account with cspm')
self.create_and_validate_cloud_account_with_cspm(cloud_account_name, bad_arn, PROVIDER_AWS, region=stack_region, expect_failure=True)

account_id = aws.extract_account_id(test_arn)
self.cleanup_existing_aws_cloud_accounts(account_id)


Logger.logger.info('Stage 4: Create new arn cloud account with cspm')
self.create_and_validate_cloud_account_with_cspm(cloud_account_name, test_arn, PROVIDER_AWS, region=stack_region, expect_failure=False)

Expand Down Expand Up @@ -121,11 +128,43 @@ def start(self):


def cleanup(self, **kwargs):
self.stack_manager.delete_stack()
if self.stack_manager:
self.stack_manager.delete_stack()
return super().cleanup(**kwargs)



def cleanup_existing_aws_cloud_accounts(self, account_id):
"""
Cleanup existing aws cloud accounts.
"""

if not account_id:
raise Exception("account_id is required")

body = {
"pageSize": 100,
"pageNum": 0,
"innerFilters": [
{
"provider": PROVIDER_AWS,
"providerInfo.accountID":account_id
}
]
}
res = self.backend.get_cloud_accounts(body=body)

if "response" in res:
if len(res["response"]) == 0:
Logger.logger.info(f"No existing aws cloud accounts to cleanup for account_id {account_id}")
return
for account in res["response"]:
guid = account["guid"]
self.backend.delete_cloud_account(guid)
Logger.logger.info(f"Deleted cloud account with guid {guid} for account_id {account_id}")

return res

def get_and_validate_cspm_link(self, region) -> str:
"""
Get and validate cspm link.
Expand Down Expand Up @@ -156,7 +195,8 @@ def create_and_validate_cloud_account_with_cspm(self, cloud_account_name:str, ar
try:
res = self.backend.create_cloud_account(body=body, provider=provider)
except Exception as e:
Logger.logger.error(f"failed to create cloud account, body used: {body}, error is {e}")
if not expect_failure:
Logger.logger.error(f"failed to create cloud account, body used: {body}, error is {e}")
failed = True

assert failed == expect_failure, f"expected_failure is {expect_failure}, but failed is {failed}, body used: {body}"
Expand Down