Skip to content

Commit

Permalink
Progress on implementing --mode changes
Browse files Browse the repository at this point in the history
  • Loading branch information
pyth0n1c committed Aug 3, 2023
1 parent 816f4f7 commit 174a0c3
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 152 deletions.
48 changes: 13 additions & 35 deletions contentctl/actions/detection_testing/GitHubService.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,21 @@ def get_detections_changed(self, director: DirectorOutputDto) -> list[Detection]
f"Error: self.repo must be initialized before getting changed detections."
)
)
raise (Exception("not implemented"))

differences = self.repo.git.diff("--name-status", self.config.main_branch).split("\n")
new_content = list(filter(lambda x: x.split('\t')[1].startswith("A") , differences))
modified_content = list(filter(lambda x: x.split('\t')[1].startswith("M") , differences))
deleted_content =list(filter(lambda x: x.split('\t')[1].startswith("D") , differences))

content_to_test = list(filter(lambda x: x.startswith("detections"), new_content+modified_content ))

import code
code.interact(local=locals())

return []

def __init__(self, config: TestConfig):
self.repo = None
self.repo = git.Repo(config.repo_path)
self.requested_detections: list[pathlib.Path] = []
self.config = config

Expand Down Expand Up @@ -174,39 +184,7 @@ def __init__(self, config: TestConfig):
return

elif config.mode == DetectionTestingMode.changes:
# Changes is ONLY possible if the app is version controlled
# in a github repo. Ensure that this is the case and, if not
# raise an exception
raise (Exception("Mode [changes] is not yet supported."))
try:
repo = git.Repo(config.repo_path)
except Exception as e:
raise (
Exception(
f"Error: detection mode [{config.mode}] REQUIRES that [{config.repo_path}] is a git repository, but it is not."
)
)
if config.main_branch == config.test_branch:
raise (
Exception(
f"Error: test_branch [{config.test_branch}] is the same as the main_branch [{config.main_branch}]. When using mode [{config.mode}], these two branches MUST be different."
)
)

# Ensure that the test branch is checked out
if self.repo.active_branch.name != config.test_branch:
raise (
Exception(
f"Error: detection mode [{config.mode}] REQUIRES that the test_branch [{config.test_branch}] be checked out at the beginning of the test, but it is not."
)
)

# Ensure that the base branch exists

if Utils.validate_git_branch_name(
config.repo_path, "NO_URL", config.main_branch
):
return
return

elif config.mode == DetectionTestingMode.all:
return
Expand Down
246 changes: 129 additions & 117 deletions contentctl/objects/test_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Needed for a staticmethod to be able to return an instance of the class it belongs to
from __future__ import annotations


import git
import validators
import pathlib
import yaml
Expand Down Expand Up @@ -46,9 +46,10 @@ class TestConfig(BaseModel, extra=Extra.forbid, validate_assignment=True):
default=None,
title="HTTP(s) path to the repo for repo_path. If this field is blank, it will be inferred from the repo",
)
# main_branch: Union[str,None] = Field(default=None, title="Main branch of the repo, if applicable.")
# test_branch: Union[str,None] = Field(default=None, title="Branch of the repo to be tested, if applicable.")
# commit_hash: Union[str,None] = Field(default=None, title="Commit hash of the repo state to be tested, if applicable")
main_branch: Union[str,None] = Field(default=None, title="Main branch of the repo, if applicable.")
test_branch: Union[str,None] = Field(default=None, title="Branch of the repo to be tested, if applicable.")
commit_hash: Union[str,None] = Field(default=None, title="Commit hash of the repo state to be tested, if applicable")

target_infrastructure: DetectionTestingTargetInfrastructure = Field(
default=DetectionTestingTargetInfrastructure.container,
title=f"Control where testing should be launched. Choose one of {DetectionTestingTargetInfrastructure._member_names_}",
Expand All @@ -75,7 +76,7 @@ class TestConfig(BaseModel, extra=Extra.forbid, validate_assignment=True):
num_containers: int = Field(
default=1, title="Number of testing containers to start in parallel."
)
# pr_number: Union[int,None] = Field(default=None, title="The number of the PR to test")
pr_number: Union[int,None] = Field(default=None, title="The number of the PR to test")
splunk_app_username: Union[str, None] = Field(
default="admin", title="The name of the user for testing"
)
Expand Down Expand Up @@ -143,101 +144,112 @@ def validate_ports_overlap(cls, v):
# Ensure that at least 1 of test_branch, commit_hash, and/or pr_number were passed.
# Otherwise, what are we testing??
# @root_validator(pre=False)
# def ensure_there_is_something_to_test(cls, values):
# if 'test_branch' not in values and 'commit_hash' not in values and'pr_number' not in values:
# if 'mode' in values and values['mode'] == DetectionTestingMode.changes:
# raise(ValueError(f"Under mode [{DetectionTestingMode.changes}], 'test_branch', 'commit_hash', and/or 'pr_number' must be defined so that we know what to test."))

# return values

# @validator('repo_path', always=True)
# def validate_repo_path(cls,v):

# try:
# path = pathlib.Path(v)
# except Exception as e:
# raise(ValueError(f"Error, the provided path is is not a valid path: '{v}'"))

# try:
# r = git.Repo(path)
# except Exception as e:
# raise(ValueError(f"Error, the provided path is not a valid git repo: '{path}'"))

# try:

# if ALWAYS_PULL_REPO:
# r.remotes.origin.pull()
# except Exception as e:
# raise ValueError(f"Error pulling git repository {v}: {str(e)}")

# return v

# @validator('repo_url', always=True)
# def validate_repo_url(cls, v, values):
# Utils.check_required_fields('repo_url', values, ['repo_path'])

# #First try to get the value from the repo
# try:
# remote_url_from_repo = git.Repo(values['repo_path']).remotes.origin.url
# except Exception as e:
# raise(ValueError(f"Error reading remote_url from the repo located at {values['repo_path']}"))

# if v is not None and remote_url_from_repo != v:
# raise(ValueError(f"The url of the remote repo supplied in the config file {v} does not "\
# f"match the value read from the repository at {values['repo_path']}, {remote_url_from_repo}"))

# if v is None:
# v = remote_url_from_repo

# #Ensure that the url is the proper format
# try:
# if bool(validators.url(v)) == False:
# raise(Exception)
# except:
# raise(ValueError(f"Error validating the repo_url. The url is not valid: {v}"))

# return v

# @validator('main_branch', always=True)
# def valid_main_branch(cls, v, values):
# Utils.check_required_fields('main_branch', values, ['repo_path', 'repo_url'])

# if v is None:
# print(f"main_branch is not supplied. Inferring from '{values['repo_path']}'...",end='')

# main_branch = Utils.get_default_branch_name(values['repo_path'], values['repo_url'])
# print(f"main_branch name '{main_branch}' inferred'")
# #continue with the validation
# v = main_branch

# try:
# Utils.validate_git_branch_name(values['repo_path'],values['repo_url'], v)
# except Exception as e:
# raise ValueError(f"Error validating main_branch: {str(e)}")
# return v

# @validator('test_branch', always=True)
# def validate_test_branch(cls, v, values):
# Utils.check_required_fields('test_branch', values, ['repo_path', 'repo_url', 'main_branch'])
# if v is None:
# print(f"No test_branch provided, so we will default to using the main_branch '{values['main_branch']}'")
# return values['main_branch']
# try:
# Utils.validate_git_branch_name(values['repo_path'],values['repo_url'], v)
# except Exception as e:
# raise ValueError(f"Error validating test_branch: {str(e)}")
# return v

# @validator('commit_hash', always=True)
# def validate_commit_hash(cls, v, values):
# Utils.check_required_fields('commit_hash', values, ['repo_path', 'repo_url', 'test_branch'])

# try:
# #We can a hash with this function too
# Utils.validate_git_hash(values['repo_path'],values['repo_url'], v, values['test_branch'])
# except Exception as e:
# raise ValueError(f"Error validating commit_hash '{v}': {str(e)}")
# return v
def ensure_there_is_something_to_test(cls, values):
if 'test_branch' not in values and 'commit_hash' not in values and'pr_number' not in values:
if 'mode' in values and values['mode'] == DetectionTestingMode.changes:
raise(ValueError(f"Under mode [{DetectionTestingMode.changes}], 'test_branch', 'commit_hash', and/or 'pr_number' must be defined so that we know what to test."))

return values

@validator('repo_path', always=True)
def validate_repo_path(cls,v):
print(f"checking repo path '{v}'")
try:
path = pathlib.Path(v)
except Exception as e:
print("exception 1")
raise(ValueError(f"Error, the provided path is is not a valid path: '{v}'"))

try:
r = git.Repo(path)
except Exception as e:
print(f"exception 2: {str(e)}")
raise(ValueError(f"Error, the provided path is not a valid git repo: '{path}'"))

try:

if ALWAYS_PULL_REPO:
r.remotes.origin.pull()
except Exception as e:
print("exception 3")
raise ValueError(f"Error pulling git repository {v}: {str(e)}")
print("repo path looks good")
return v

@validator('repo_url', always=True)
def validate_repo_url(cls, v, values):
Utils.check_required_fields('repo_url', values, ['repo_path'])

#First try to get the value from the repo
try:
remote_url_from_repo = git.Repo(values['repo_path']).remotes.origin.url
except Exception as e:
raise(ValueError(f"Error reading remote_url from the repo located at {values['repo_path']}"))

if v is not None and remote_url_from_repo != v:
raise(ValueError(f"The url of the remote repo supplied in the config file {v} does not "\
f"match the value read from the repository at {values['repo_path']}, {remote_url_from_repo}"))

if v is None:
v = remote_url_from_repo

#Ensure that the url is the proper format
try:
if bool(validators.url(v)) == False:
raise(Exception)
except:
raise(ValueError(f"Error validating the repo_url. The url is not valid: {v}"))

return v
@validator('main_branch', always=True)
def valid_main_branch(cls, v, values):
Utils.check_required_fields('main_branch', values, ['repo_path', 'repo_url'])
print("checking the branch")
if v is None:
print(f"main_branch is not supplied. Inferring from '{values['repo_path']}'...",end='')

main_branch = Utils.get_default_branch_name(values['repo_path'], values['repo_url'])
print(f"main_branch name '{main_branch}' inferred'")
#continue with the validation
v = main_branch

try:
Utils.validate_git_branch_name(values['repo_path'],values['repo_url'], v)
except Exception as e:
raise ValueError(f"Error validating main_branch: {str(e)}")
return v

@validator('test_branch', always=True)
def validate_test_branch(cls, v, values):
Utils.check_required_fields('test_branch', values, ['repo_path', 'repo_url', 'main_branch'])
if v is None:
print(f"No test_branch provided, so we will default to using the main_branch '{values['main_branch']}'")
v = values['main_branch']
try:
Utils.validate_git_branch_name(values['repo_path'],values['repo_url'], v)
except Exception as e:
raise ValueError(f"Error validating test_branch: {str(e)}")

r = git.Repo(values.get("repo_path"))
try:
if r.active_branch != v:
print(f"We are trying to test {v} but the current active branch is {r.active_branch}")
print(f"Checking out {v}")
r.git.checkout(v)
except Exception as e:
raise ValueError(f"Error checking out test_branch '{v}': {str(e)}")
return v

@validator('commit_hash', always=True)
def validate_commit_hash(cls, v, values):
Utils.check_required_fields('commit_hash', values, ['repo_path', 'repo_url', 'test_branch'])

try:
#We can a hash with this function too
Utils.validate_git_hash(values['repo_path'],values['repo_url'], v, values['test_branch'])
except Exception as e:
raise ValueError(f"Error validating commit_hash '{v}': {str(e)}")
return v

@validator("full_image_path", always=True)
def validate_full_image_path(cls, v, values):
Expand Down Expand Up @@ -397,27 +409,27 @@ def validate_num_containers(cls, v):
)
return v

# @validator('pr_number', always=True)
# def validate_pr_number(cls, v, values):
# Utils.check_required_fields('pr_number', values, ['repo_path', 'commit_hash'])
@validator('pr_number', always=True)
def validate_pr_number(cls, v, values):
Utils.check_required_fields('pr_number', values, ['repo_path', 'commit_hash'])

# if v == None:
# return v
if v == None:
return v

# hash = Utils.validate_git_pull_request(values['repo_path'], v)
hash = Utils.validate_git_pull_request(values['repo_path'], v)

# #Ensure that the hash is equal to the one in the config file, if it exists.
# if values['commit_hash'] is None:
# values['commit_hash'] = hash
# else:
# if values['commit_hash'] != hash:
# raise(ValueError(f"commit_hash specified in configuration was {values['commit_hash']}, but commit_hash"\
# f" from pr_number {v} was {hash}. These must match. If you're testing"\
# " a PR, you probably do NOT want to provide the commit_hash in the configuration file "\
# "and always want to test the head of the PR. This will be done automatically if you do "\
# "not provide the commit_hash."))
#Ensure that the hash is equal to the one in the config file, if it exists.
if values['commit_hash'] is None:
values['commit_hash'] = hash
else:
if values['commit_hash'] != hash:
raise(ValueError(f"commit_hash specified in configuration was {values['commit_hash']}, but commit_hash"\
f" from pr_number {v} was {hash}. These must match. If you're testing"\
" a PR, you probably do NOT want to provide the commit_hash in the configuration file "\
"and always want to test the head of the PR. This will be done automatically if you do "\
"not provide the commit_hash."))

# return v
return v

@validator("splunk_app_password", always=True)
def validate_splunk_app_password(cls, v):
Expand Down

0 comments on commit 174a0c3

Please sign in to comment.