Skip to content

Commit

Permalink
terraform_plan/direct_references: referenced_by should read resource_…
Browse files Browse the repository at this point in the history
…changes
  • Loading branch information
refeed authored and arunim2405 committed Nov 1, 2023
1 parent c03a75b commit 0f71ea7
Show file tree
Hide file tree
Showing 3 changed files with 2,531 additions and 27 deletions.
80 changes: 53 additions & 27 deletions src/tirith/providers/terraform_plan/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,19 @@ def direct_references_operator_referenced_by(input_data: dict, provider_inputs:
# referenced by `referenced_by` return true, otherwise false
config_resources = input_data.get("configuration", {}).get("root_module", {}).get("resources", [])
resource_type = provider_inputs.get("terraform_resource_type")
resource_changes = input_data.get("resource_changes", [])
referenced_by = provider_inputs.get("referenced_by")

reference_target = set()
reference_target_addresses = set()
is_resource_found = False

# Loop for adding reference_target
for config_resource in config_resources:
if config_resource.get("type") != resource_type:
for resource_change in resource_changes:
if resource_change.get("type") != resource_type and resource_change.get("change", {}).get("actions") != [
"delete"
]:
continue
reference_target.add(config_resource.get("address"))
reference_target_addresses.add(resource_change.get("address"))
is_resource_found = True

if not is_resource_found:
Expand All @@ -239,50 +242,73 @@ def direct_references_operator_referenced_by(input_data: dict, provider_inputs:
return

# Loop for removing reference_target
for config_resource in config_resources:
if config_resource.get("type") != referenced_by:
for resource_change in resource_changes:
if resource_change.get("type") != referenced_by:
continue

for expression_val_dict in config_resource.get("expressions", {}).values():
if not isinstance(expression_val_dict, dict):
continue
# Look to the resource_config to get the references
for resource_config in get_resource_config_by_type(input_data, referenced_by):
for expression_val_dict in resource_config.get("expressions", {}).values():
if not isinstance(expression_val_dict, dict):
continue

for reference in expression_val_dict.get("references", []):
if reference in reference_target:
reference_target.remove(reference)
for reference_address in expression_val_dict.get("references", []):
if reference_address in reference_target_addresses:
reference_target_addresses.remove(reference_address)

is_all_referenced = len(reference_target) == 0
is_all_referenced = len(reference_target_addresses) == 0
outputs.append({"value": is_all_referenced, "meta": config_resources})


def get_resource_config_by_type(input_data: dict, resource_type: str) -> iter:
"""
Get all of the resource config by type
:param input_data: The input data
:param resource_type: The resource type
:return: The resource config (iterable)
"""
config_resources = input_data.get("configuration", {}).get("root_module", {}).get("resources", [])

for config_resource in config_resources:
if config_resource.get("type") != resource_type:
continue
yield config_resource


def direct_references_operator_references_to(input_data: dict, provider_inputs: dict, outputs: list):
# The exact opposite of `direct_references_operator_referenced_by`
config_resources = input_data.get("configuration", {}).get("root_module", {}).get("resources", [])
resource_changes = input_data.get("resource_changes", [])
resource_type = provider_inputs.get("terraform_resource_type")
references_to = provider_inputs.get("references_to")
references_to_type = provider_inputs.get("references_to")

resource_type_count = 0
reference_count = 0
is_resource_found = False

for config_resource in config_resources:
if config_resource.get("type") != resource_type:
for resource_change in resource_changes:
if resource_change.get("type") != resource_type:
continue
is_resource_found = True
resource_type_count += 1
resource_change_address = resource_change.get("address")

for expression_val_dict in config_resource.get("expressions", {}).values():
if not isinstance(expression_val_dict, dict):
# Look to the resource_config to get the references
for resource_config in get_resource_config_by_type(input_data, resource_type):
if resource_config.get("address") != resource_change_address:
continue

for reference in expression_val_dict.get("references", []):
reference_res_type = reference.split(".")[0]
if reference_res_type == references_to:
reference_count += 1
# We break early because most of the times the references
# list contains something like this:
# ["aws_s3_bucket.a.id", "aws_s3_bucket.a"]
break
for expression_val_dict in resource_config.get("expressions", {}).values():
if not isinstance(expression_val_dict, dict):
continue
for reference in expression_val_dict.get("references", []):
reference_res_type = reference.split(".")[0]
if reference_res_type == references_to_type:
reference_count += 1
# We break early because most of the times the references
# list contains something like this:
# ["aws_s3_bucket.a.id", "aws_s3_bucket.a"]
break

if not is_resource_found:
outputs.append(
Expand Down
Loading

0 comments on commit 0f71ea7

Please sign in to comment.