Skip to content

Commit

Permalink
randomizing the details of users, groups, applications and administra…
Browse files Browse the repository at this point in the history
…tive units
  • Loading branch information
mvelazc0 committed Jul 1, 2024
1 parent 337b05f commit 1cb9ec4
Show file tree
Hide file tree
Showing 15 changed files with 92,028 additions and 223 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ dotnet-uninstall-debian-packages.sh
*.wixpdb

#ignore text files
*.txt
#*.txt

# Ignore packages
*.deb
Expand Down
219 changes: 121 additions & 98 deletions BadZure.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def get_ms_token_username_pass(tenant_id, username, password, scope):
logging.error (f'Error obtaining token. Http response: {response.status_code}')
logging.error (response.text)


def create_attack_path(attack_patch_config, users, applications, domain, password):

app_owner_assignments = {}
Expand Down Expand Up @@ -175,44 +174,6 @@ def create_attack_path(attack_patch_config, users, applications, domain, passwor

return initial_access_user, app_owner_assignments, user_role_assignments, app_role_assignments, app_api_permission_assignments


def create_attack_path_3(users, applications, domain, password):
attack_path_apps = {}

# Pick a random application
app_keys = list(applications.keys())
random_app = random.choice(app_keys)
app_id = applications[random_app]['display_name']

# Assign API permission to the application
api_permission_id = "9e3f62cf-ca93-4989-b6ce-bf83c28f9fe8" # ID for "RoleManagement.ReadWrite.Directory"

# Pick a random user for helpdesk admin role
helpdesk_admin_role_id = "729827e3-9c14-49f7-bb1b-9608f156bbb8" # ID for "Helpdesk Administrator"
user_keys = list(users.keys())
random_helpdesk_user = random.choice(user_keys)
helpdesk_user_principal_name = f"{users[random_helpdesk_user]['user_principal_name']}@{domain}"

# Pick another random user to be the owner
random_owner_user = random.choice(user_keys)
owner_user_principal_name = f"{users[random_owner_user]['user_principal_name']}@{domain}"

assignment_key = f"{random_app}-{random_helpdesk_user}-{random_owner_user}"

attack_path_apps[assignment_key] = {
'app_name': random_app,
'api_permission_id': api_permission_id,
'helpdesk_user_principal_name': helpdesk_user_principal_name,
'helpdesk_display_name': users[random_helpdesk_user]['display_name'],
'owner_user_principal_name': owner_user_principal_name,
'owner_display_name': users[random_owner_user]['display_name'],
'role_id': helpdesk_admin_role_id,
'password': password
}

return attack_path_apps


def load_config(file_path):
"""Load and return the configuration from a YAML file."""
try:
Expand Down Expand Up @@ -304,7 +265,6 @@ def create_random_assignments(users, groups, administrative_units, applications)

return user_group_assignments, user_au_assignments, user_role_assignments, app_role_assignments, app_api_permission_assignments


def generate_random_password(length=15):
if length < 8:
raise ValueError("Password length must be at least 8 characters")
Expand All @@ -320,74 +280,96 @@ def generate_random_password(length=15):
random.shuffle(password)
return ''.join(password)

def read_lines_from_file(file_path):
with open(file_path, mode='r') as file:
return [line.strip() for line in file.readlines()]

def load_users_from_csv(file_path):
import csv
def generate_user_details(first_names_file, last_names_file, number_of_users):

first_names = read_lines_from_file(first_names_file)
last_names = read_lines_from_file(last_names_file)
users = {}
with open(file_path, mode='r') as infile:
reader = csv.DictReader(infile)
for row in reader:
key = f"{row['FirstName']}.{row['LastName']}".lower()
users[key] = {
'user_principal_name': key,
'display_name': f"{row['FirstName']} {row['LastName']}",
'mail_nickname': key,
'password': generate_random_password()
}

for _ in range(number_of_users):
first_name = random.choice(first_names)
last_name = random.choice(last_names)
key = f"{first_name}.{last_name}".lower()
users[key] = {
'user_principal_name': key,
'display_name': f"{first_name.capitalize()} {last_name.capitalize()}",
'mail_nickname': key,
'password': generate_random_password()
}

return users

def load_groups_from_csv(file_path):
import csv
def generate_group_details(file_path, number_of_groups):

groups = {}
with open(file_path, mode='r') as infile:
reader = csv.DictReader(infile)
for row in reader:
key = row['DisplayName']
groups[key] = {
'display_name': row['DisplayName']
}

group_names = read_lines_from_file(file_path)

selected_groups = random.sample(group_names, number_of_groups)

for group_name in selected_groups:
groups[group_name] = {
'display_name': group_name
}

return groups

def load_applications_from_csv(file_path):
import csv
def generate_app_details(prefix_file, core_file, suffix_file, number_of_names):

prefixes = read_lines_from_file(prefix_file)
core_names = read_lines_from_file(core_file)
suffixes = read_lines_from_file(suffix_file)

app_names = set()

while len(app_names) < number_of_names:
prefix = random.choice(prefixes)
core_name = random.choice(core_names)
suffix = random.choice(suffixes) if random.random() > 0.5 else ""
app_name = f"{prefix}{core_name}{suffix}"
app_names.add(app_name)

applications = {}
with open(file_path, mode='r') as infile:
reader = csv.DictReader(infile)
for row in reader:
key = row['DisplayName'] # Use DisplayName as the key
applications[key] = {
'display_name': row['DisplayName']
}
for name in app_names:
applications[name] = {
'display_name': name
}

return applications

def load_administrative_units_from_csv(file_path):
import csv
administrative_units = {}
with open(file_path, mode='r') as infile:
reader = csv.DictReader(infile)
for row in reader:
key = row['DisplayName'] # Use DisplayName as the key
administrative_units[key] = {
'display_name': row['DisplayName']
}
return administrative_units
def generate_administrative_units_details(file_path, number_of_aunits):

aunits = {}

aunit_names = read_lines_from_file(file_path)

selected_groups = random.sample(aunit_names, number_of_aunits)

for group_name in selected_groups:
aunits[group_name] = {
'display_name': group_name
}

return aunits

def update_password(users, username, new_password):
if username in users:
users[username]['password'] = new_password
else:
print(f"User {username} not found.")


@click.group()
def cli():
pass

@cli.command()
@click.option('--verbose', is_flag=True, help="Enable verbose output")
def build(verbose):
"""Build and configure Azure AD users and groups"""

"""Set up Azure AD tenant"""
azure_config_dir = os.path.expanduser('~/.azure')
os.environ['AZURE_CONFIG_DIR'] = azure_config_dir

Expand All @@ -396,21 +378,28 @@ def build(verbose):
config = load_config('badzure.yml')
tenant_id = config['tenant']['tenant_id']
domain = config['tenant']['domain']

max_users = config['tenant']['users']
max_groups = config['tenant']['groups']
max_apps = config['tenant']['applications']
max_aunits = config['tenant']['administrative_units']


# Load users data from CSV
users = load_users_from_csv('Csv/users.csv')
# Generate random users
users = generate_user_details('entity_data/first-names.txt', 'entity_data/last-names.txt', max_users)

# Load groups data from CSV
groups = load_groups_from_csv('Csv/groups.csv')
# Generate random groups
groups = generate_group_details('entity_data/group-names.txt', max_groups)

# Load applications data from CSV
applications = load_applications_from_csv('Csv/apps.csv')
# Generate random application registrations
applications = generate_app_details('entity_data/app-prefixes.txt', 'entity_data/app-core-names.txt','entity_data/app-sufixes.txt', max_apps)

# Load administrative units data from CSV
administrative_units = load_administrative_units_from_csv('Csv/a_units.csv')
# Generate random administratuve units
administrative_units = generate_administrative_units_details('entity_data/administrative-units.txt', max_aunits)

# Create random assignments
logging.info("Creating random assignments for users, groups, and administrative units.")
logging.info("Creating random assignments for groups, administrative units, azure ad roles and graph api permissions")

user_group_assignments, user_au_assignments, user_role_assignments, app_role_assignments, app_api_permission_assignments = create_random_assignments(users, groups, administrative_units, applications)

attack_path_application_owner_assignments, attack_path_user_role_assignments, attack_path_app_role_assignments, attack_path_app_api_permission_assignments = {}, {}, {}, {}
Expand Down Expand Up @@ -472,7 +461,7 @@ def build(verbose):
logging.error(stderr)
return

logging.info(f"Calling terraform apply, to create resources, this may take several minutes ...")
logging.info(f"Calling terraform apply to create resources, this may take several minutes ...")
return_code, stdout, stderr = tf.apply(skip_plan=True, capture_output=not verbose)
if return_code != 0:
logging.error(f"Terraform apply failed: {stderr}")
Expand All @@ -494,12 +483,46 @@ def build(verbose):
logging.info(f"Obtaining tokens")
logging.info(f"Access Token: {tokens['access_token']}")
logging.info(f"Refresh Token: {tokens['refresh_token']}")


@cli.command()
@click.option('--verbose', is_flag=True, help="Enable verbose output")
def show(verbose):
"""Show the created resources in Azure AD tenant"""

# Ensure terraform.tfvars.json exists
tfvars_path = os.path.join(TERRAFORM_DIR, 'terraform.tfvars.json')
if not os.path.exists(tfvars_path):
logging.error("Error: terraform.tfvars.json file not found.")
return

# Initialize the Terraform configuration
return_code, stdout, stderr = tf.init()
if return_code != 0:
logging.error(f"Terraform init failed: {stderr}")
return

logging.info(f"Calling terraform show to display the current state ...")

# Execute the terraform show command
return_code, stdout, stderr = tf.show(capture_output=not verbose)

if return_code != 0:
logging.error(f"Terraform show failed: {stderr}")
logging.error(stdout)
logging.error(stderr)
return

if verbose:
print(stdout)
else:
logging.info(stdout)

logging.info("Current state of Azure AD tenant resources displayed successfully.")

@cli.command()
@click.option('--verbose', is_flag=True, help="Enable verbose output")
def destroy(verbose):

"""Destroy created resources in Azure AD tenant"""

# Ensure terraform.tfvars.json exists
tfvars_path = os.path.join(TERRAFORM_DIR, 'terraform.tfvars.json')
Expand Down Expand Up @@ -528,16 +551,16 @@ def destroy(verbose):

# Remove the state files after destroying the resources
logging.info(f"Deleting terraform state files ")
for file in ["terraform.tfstate", "terraform.tfstate.backup"]:
for file in ["terraform.tfstate", "terraform.tfstate.backup", "terraform.tfvars.json"]:
try:
os.remove(os.path.join(TERRAFORM_DIR, file))
except FileNotFoundError:
pass

logging.info("Good bye.")


if __name__ == '__main__':

setup_logging(logging.INFO)
print (banner)
time.sleep(2)
Expand Down
21 changes: 0 additions & 21 deletions Csv/a_units.csv

This file was deleted.

21 changes: 0 additions & 21 deletions Csv/apps.csv

This file was deleted.

10 changes: 0 additions & 10 deletions Csv/groups.csv

This file was deleted.

Loading

0 comments on commit 1cb9ec4

Please sign in to comment.