Skip to content
Open
17 changes: 15 additions & 2 deletions src/tirith/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import yaml
from types import CodeType

from typing import Any, Dict, List, Tuple, Optional
from typing import Any, Dict, List, Tuple, Optional, Union

from tirith.providers.common import ProviderError
from ..providers import PROVIDERS_DICT
Expand Down Expand Up @@ -81,8 +81,21 @@ def generate_evaluator_result(evaluator_obj, input_data, provider_module):
has_evaluation_passed = None
continue

# Run evaluation on the provider's input value
evaluation_result = evaluator_instance.evaluate(evaluator_input["value"], evaluator_data)
evaluation_result["meta"] = evaluator_input.get("meta")

# Copy metadata and addresses if provided by the provider
if "meta" in evaluator_input:
evaluation_result["meta"] = evaluator_input["meta"]

if "addresses" in evaluator_input:
# Add addresses directly
addresses = evaluator_input["addresses"]
# TODO: We need to make a model class for the `evaluator_input` and move this validation there
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the type assertion mentioned in #262 (comment) here

                if not isinstance(addresses, list):
                    raise Exception("`addresses` should be a list")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the comment to the top of line 94

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

                # TODO: We need to make a model class for the `evaluator_input` and move this validation there
                if not isinstance(addresses, list):
                    raise Exception("`addresses` should be a 
 

if not isinstance(addresses, list):
raise Exception("`addresses` should be a list")
evaluation_result["addresses"] = addresses

evaluation_results.append(evaluation_result)
has_valid_evaluation = True

Expand Down
9 changes: 9 additions & 0 deletions src/tirith/prettyprinter.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ def pretty_print_result_dict(final_result_dict: Dict) -> None:

for result_num, result_dict in enumerate(check_dict["result"]):
result_message = result_dict["message"]

# Include addresses in the message if it exists in the result_dict
if "addresses" in result_dict:
addresses = result_dict["addresses"]
# Format addresses as a comma-separated string
if isinstance(addresses, list) and addresses:
addresses_str = ", ".join(addresses)
result_message = f"{result_message} - (Addresses: `{addresses_str}`)"

if result_dict["passed"]:
print(TermStyle.green(f" {result_num+1}. PASSED: {result_message}"))
elif check_dict["passed"] is None:
Expand Down
6 changes: 5 additions & 1 deletion src/tirith/providers/json/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ def get_value(provider_args: Dict, input_data: Dict) -> List[dict]:
)
]

outputs = [create_result_dict(value=value, meta=None, err=None) for value in values]
outputs = []
for value in values:
result = create_result_dict(value=value, meta=None, err=None)
result["addresses"] = [key_path]
outputs.append(result)

return outputs

Expand Down
131 changes: 89 additions & 42 deletions src/tirith/providers/terraform_plan/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

# input->(list ["a.b","c", "d"],value of resource)
# returns->[any, any, any]
from typing import Iterable, Tuple
from typing import Iterable, Tuple, List, Union, Any
import pydash

from ..common import ProviderError
Expand Down Expand Up @@ -56,6 +56,20 @@ def _get_exp_attribute(split_expressions, input_data):
return final_data


def _get_addresses_from_resource_change(resource_change: dict) -> List[str]:
"""
Helper function to extract addresses from a resource change.

:param resource_change: A dictionary containing resource change information from Terraform plan
:type resource_change: dict
:return: A list containing the resource address if found, otherwise an empty list
:rtype: List[str]

"""
address = resource_change.get("address")
return [address] if address is not None else []


def provide(provider_inputs, input_data):
# """Provides the value of the attribute from the input_data"""
outputs = []
Expand Down Expand Up @@ -89,6 +103,8 @@ def provide(provider_inputs, input_data):
if resource_type in (resource_change["type"], "*"):
is_resource_found = True
input_resource_change_attrs = resource_change["change"]["after"]
addresses = _get_addresses_from_resource_change(resource_change)

# [local_is_found_attribute] (local scope)
# Used to decide whether to append a None value for each specific resource that's missing the attribute
if input_resource_change_attrs:
Expand All @@ -97,24 +113,36 @@ def provide(provider_inputs, input_data):
is_attribute_found = True
local_is_found_attribute = True
attribute_value = input_resource_change_attrs[attribute]
outputs.append(
{
"value": attribute_value,
"meta": resource_change,
"err": None,
}
)
result = {
"value": attribute_value,
"meta": resource_change,
"err": None,
"addresses": addresses,
}
outputs.append(result)
elif "." in attribute or "*" in attribute:
evaluated_outputs = _wrapper_get_exp_attribute(attribute, input_resource_change_attrs)
if evaluated_outputs:
is_attribute_found = True
local_is_found_attribute = True
for evaluated_output in evaluated_outputs:
outputs.append({"value": evaluated_output, "meta": resource_change, "err": None})
result = {
"value": evaluated_output,
"meta": resource_change,
"err": None,
"addresses": addresses,
}
outputs.append(result)

# If we didn't find the attribute in this resource, add a None value so it still gets evaluated
if not local_is_found_attribute:
outputs.append({"value": None, "meta": resource_change, "err": None})
result = {
"value": None,
"meta": resource_change,
"err": None,
"addresses": addresses,
}
outputs.append(result)
else:
outputs.append(
{
Expand Down Expand Up @@ -153,14 +181,15 @@ def provide(provider_inputs, input_data):
continue
if resource_type in (resource_change["type"], "*"):
is_resource_type_found = True
addresses = _get_addresses_from_resource_change(resource_change)
for action in resource_change["change"]["actions"]:
outputs.append(
{
"value": action,
"meta": resource_change,
"err": None,
}
)
result = {
"value": action,
"meta": resource_change,
"err": None,
"addresses": addresses,
}
outputs.append(result)
if not is_resource_type_found:
outputs.append(
{
Expand All @@ -175,6 +204,7 @@ def provide(provider_inputs, input_data):
elif input_type == "count":
count = 0
resource_meta = {}
addresses = []
resource_type = provider_inputs["terraform_resource_type"]
for resource_change in resource_changes:
# Skip if resource type is in exclude_resource_types when using wildcard
Expand All @@ -184,15 +214,13 @@ def provide(provider_inputs, input_data):
# No need to check if the resource is not found
# because the count of a resource can be zero
resource_meta = resource_change
# Add the address to our list of addresses if available
if "address" in resource_change:
addresses.append(resource_change["address"])
count += 1

outputs.append(
{
"value": count,
"meta": resource_meta,
"err": None,
}
)
result = {"value": count, "meta": resource_meta, "err": None, "addresses": addresses}
outputs.append(result)
return outputs
# CASE 4
elif input_type == "direct_dependencies":
Expand Down Expand Up @@ -252,7 +280,6 @@ def provider_config_operator(input_data: dict, provider_inputs: dict, outputs: l
continue

is_provider_full_name_found = True

attribute_value = None

if attribute_to_get == "version_constraint":
Expand All @@ -271,12 +298,9 @@ def provider_config_operator(input_data: dict, provider_inputs: dict, outputs: l
}
)
return
outputs.append(
{
"value": attribute_value,
"meta": provider_config_dict,
}
)

result = {"value": attribute_value, "meta": provider_config_dict, "addresses": [terraform_provider_full_name]}
outputs.append(result)

if not is_provider_full_name_found:
outputs.append(
Expand All @@ -297,7 +321,10 @@ def terraform_version_operator(input_data: dict, provider_inputs: dict, outputs:
:param provider_inputs: The provider inputs
:param outputs: The outputs
"""
outputs.append({"value": input_data.get("terraform_version"), "meta": input_data})
# For terraform_version, there's no specific address as it applies to the entire plan
outputs.append(
{"value": input_data.get("terraform_version"), "meta": input_data, "addresses": ["terraform_version"]}
)


def direct_dependencies_operator(input_data: dict, provider_inputs: dict, outputs: list):
Expand All @@ -316,12 +343,16 @@ def direct_dependencies_operator(input_data: dict, provider_inputs: dict, output
is_resource_found = False

for resource in config_resources:

if resource.get("type") != resource_type:
continue
is_resource_found = True
deps_resource_type = {resource_id.split(".")[0] for resource_id in resource.get("depends_on", [])}
outputs.append({"value": list(deps_resource_type), "meta": config_resources})
result = {"value": list(deps_resource_type), "meta": config_resources}
# Add addresses if available
address = resource.get("address")
if address:
result["addresses"] = [address]
outputs.append(result)

if not is_resource_found:
outputs.append(
Expand Down Expand Up @@ -386,13 +417,20 @@ def direct_references_operator_referenced_by(input_data: dict, provider_inputs:
reference_address = f"{module_path}.{relative_reference_address}"
if reference_address in reference_target_addresses:
reference_target_addresses.remove(reference_address)
outputs.append(
{"value": True, "meta": {"address": reference_address, "referenced_by": resource_config}}
)
result = {
"value": True,
"meta": {"referenced_by": resource_config},
"addresses": [reference_address],
}

outputs.append(result)

# For all of the reference_target_addresses that don't have a reference
for reference_target_address in reference_target_addresses:
outputs.append({"value": False, "meta": {"address": reference_target_address, "referenced_by": {}}})
for reference_target_addresses_item in reference_target_addresses:
result = {"value": False, "meta": {"referenced_by": {}}}
# Add addresses as a simple list
result["addresses"] = [reference_target_addresses_item]
outputs.append(result)


def get_module_resources_by_type_recursive(module: dict, resource_type: str, current_module_path: str = "") -> iter:
Expand Down Expand Up @@ -478,7 +516,11 @@ def direct_references_operator_references_to(input_data: dict, provider_inputs:
return

is_all_resource_type_references_to = resource_type_count == reference_count
outputs.append({"value": is_all_resource_type_references_to, "meta": config_resources})
result = {"value": is_all_resource_type_references_to, "meta": config_resources}
# Simple list with resource type
# TODO: Use the real specific addresses
result["addresses"] = [resource_type]
outputs.append(result)


def direct_references_operator(input_data: dict, provider_inputs: dict, outputs: list):
Expand Down Expand Up @@ -513,7 +555,7 @@ def direct_references_operator(input_data: dict, provider_inputs: dict, outputs:
return

is_resource_found = False

addresses = []
for resource in config_resources:

if resource.get("type") != resource_type:
Expand All @@ -529,8 +571,13 @@ def direct_references_operator(input_data: dict, provider_inputs: dict, outputs:
for reference in expressions_val.get("references", []):
# Only get the resource type
resource_references.add(reference.split(".")[0])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the address from the reference

addresses.append(reference)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@refeed I am unable to understand why do we need to do this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because that's the real addresses the optype is processing. The resource var that's placed in the meta key was actually not placed correctly, it only contains one resource even though this optype is processing more than one resources

addresses.append(reference)

result = {"value": list(resource_references), "meta": resource}

outputs.append({"value": list(resource_references), "meta": resource})
if addresses:
result["addresses"] = addresses
outputs.append(result)

if not is_resource_found:
outputs.append(
Expand Down