diff --git a/src/tirith/core/core.py b/src/tirith/core/core.py index f24917f5..6ec2a727 100644 --- a/src/tirith/core/core.py +++ b/src/tirith/core/core.py @@ -5,7 +5,7 @@ import yaml from types import CodeType -from typing import Any, Dict, List, Tuple, Optional +from typing import Any, Dict, List, Tuple, Optional, Union from tirith.providers.common import ProviderError from ..providers import PROVIDERS_DICT @@ -81,8 +81,21 @@ def generate_evaluator_result(evaluator_obj, input_data, provider_module): has_evaluation_passed = None continue + # Run evaluation on the provider's input value evaluation_result = evaluator_instance.evaluate(evaluator_input["value"], evaluator_data) - evaluation_result["meta"] = evaluator_input.get("meta") + + # Copy metadata and addresses if provided by the provider + if "meta" in evaluator_input: + evaluation_result["meta"] = evaluator_input["meta"] + + if "addresses" in evaluator_input: + # Add addresses directly + addresses = evaluator_input["addresses"] + # TODO: We need to make a model class for the `evaluator_input` and move this validation there + if not isinstance(addresses, list): + raise Exception("`addresses` should be a list") + evaluation_result["addresses"] = addresses + evaluation_results.append(evaluation_result) has_valid_evaluation = True diff --git a/src/tirith/prettyprinter.py b/src/tirith/prettyprinter.py index 4134ba74..c731c0ea 100644 --- a/src/tirith/prettyprinter.py +++ b/src/tirith/prettyprinter.py @@ -99,6 +99,15 @@ def pretty_print_result_dict(final_result_dict: Dict) -> None: for result_num, result_dict in enumerate(check_dict["result"]): result_message = result_dict["message"] + + # Include addresses in the message if it exists in the result_dict + if "addresses" in result_dict: + addresses = result_dict["addresses"] + # Format addresses as a comma-separated string + if isinstance(addresses, list) and addresses: + addresses_str = ", ".join(addresses) + result_message = f"{result_message} - (Addresses: `{addresses_str}`)" + if result_dict["passed"]: print(TermStyle.green(f" {result_num+1}. PASSED: {result_message}")) elif check_dict["passed"] is None: diff --git a/src/tirith/providers/json/handler.py b/src/tirith/providers/json/handler.py index ce4ced5b..a2b1b6b8 100644 --- a/src/tirith/providers/json/handler.py +++ b/src/tirith/providers/json/handler.py @@ -43,7 +43,11 @@ def get_value(provider_args: Dict, input_data: Dict) -> List[dict]: ) ] - outputs = [create_result_dict(value=value, meta=None, err=None) for value in values] + outputs = [] + for value in values: + result = create_result_dict(value=value, meta=None, err=None) + result["addresses"] = [key_path] + outputs.append(result) return outputs diff --git a/src/tirith/providers/terraform_plan/handler.py b/src/tirith/providers/terraform_plan/handler.py index 04471756..9a41132b 100644 --- a/src/tirith/providers/terraform_plan/handler.py +++ b/src/tirith/providers/terraform_plan/handler.py @@ -8,7 +8,7 @@ # input->(list ["a.b","c", "d"],value of resource) # returns->[any, any, any] -from typing import Iterable, Tuple +from typing import Iterable, Tuple, List, Union, Any import pydash from ..common import ProviderError @@ -56,6 +56,20 @@ def _get_exp_attribute(split_expressions, input_data): return final_data +def _get_addresses_from_resource_change(resource_change: dict) -> List[str]: + """ + Helper function to extract addresses from a resource change. + + :param resource_change: A dictionary containing resource change information from Terraform plan + :type resource_change: dict + :return: A list containing the resource address if found, otherwise an empty list + :rtype: List[str] + + """ + address = resource_change.get("address") + return [address] if address is not None else [] + + def provide(provider_inputs, input_data): # """Provides the value of the attribute from the input_data""" outputs = [] @@ -89,6 +103,8 @@ def provide(provider_inputs, input_data): if resource_type in (resource_change["type"], "*"): is_resource_found = True input_resource_change_attrs = resource_change["change"]["after"] + addresses = _get_addresses_from_resource_change(resource_change) + # [local_is_found_attribute] (local scope) # Used to decide whether to append a None value for each specific resource that's missing the attribute if input_resource_change_attrs: @@ -97,24 +113,36 @@ def provide(provider_inputs, input_data): is_attribute_found = True local_is_found_attribute = True attribute_value = input_resource_change_attrs[attribute] - outputs.append( - { - "value": attribute_value, - "meta": resource_change, - "err": None, - } - ) + result = { + "value": attribute_value, + "meta": resource_change, + "err": None, + "addresses": addresses, + } + outputs.append(result) elif "." in attribute or "*" in attribute: evaluated_outputs = _wrapper_get_exp_attribute(attribute, input_resource_change_attrs) if evaluated_outputs: is_attribute_found = True local_is_found_attribute = True for evaluated_output in evaluated_outputs: - outputs.append({"value": evaluated_output, "meta": resource_change, "err": None}) + result = { + "value": evaluated_output, + "meta": resource_change, + "err": None, + "addresses": addresses, + } + outputs.append(result) # If we didn't find the attribute in this resource, add a None value so it still gets evaluated if not local_is_found_attribute: - outputs.append({"value": None, "meta": resource_change, "err": None}) + result = { + "value": None, + "meta": resource_change, + "err": None, + "addresses": addresses, + } + outputs.append(result) else: outputs.append( { @@ -153,14 +181,15 @@ def provide(provider_inputs, input_data): continue if resource_type in (resource_change["type"], "*"): is_resource_type_found = True + addresses = _get_addresses_from_resource_change(resource_change) for action in resource_change["change"]["actions"]: - outputs.append( - { - "value": action, - "meta": resource_change, - "err": None, - } - ) + result = { + "value": action, + "meta": resource_change, + "err": None, + "addresses": addresses, + } + outputs.append(result) if not is_resource_type_found: outputs.append( { @@ -175,6 +204,7 @@ def provide(provider_inputs, input_data): elif input_type == "count": count = 0 resource_meta = {} + addresses = [] resource_type = provider_inputs["terraform_resource_type"] for resource_change in resource_changes: # Skip if resource type is in exclude_resource_types when using wildcard @@ -184,15 +214,13 @@ def provide(provider_inputs, input_data): # No need to check if the resource is not found # because the count of a resource can be zero resource_meta = resource_change + # Add the address to our list of addresses if available + if "address" in resource_change: + addresses.append(resource_change["address"]) count += 1 - outputs.append( - { - "value": count, - "meta": resource_meta, - "err": None, - } - ) + result = {"value": count, "meta": resource_meta, "err": None, "addresses": addresses} + outputs.append(result) return outputs # CASE 4 elif input_type == "direct_dependencies": @@ -252,7 +280,6 @@ def provider_config_operator(input_data: dict, provider_inputs: dict, outputs: l continue is_provider_full_name_found = True - attribute_value = None if attribute_to_get == "version_constraint": @@ -271,12 +298,9 @@ def provider_config_operator(input_data: dict, provider_inputs: dict, outputs: l } ) return - outputs.append( - { - "value": attribute_value, - "meta": provider_config_dict, - } - ) + + result = {"value": attribute_value, "meta": provider_config_dict, "addresses": [terraform_provider_full_name]} + outputs.append(result) if not is_provider_full_name_found: outputs.append( @@ -297,7 +321,10 @@ def terraform_version_operator(input_data: dict, provider_inputs: dict, outputs: :param provider_inputs: The provider inputs :param outputs: The outputs """ - outputs.append({"value": input_data.get("terraform_version"), "meta": input_data}) + # For terraform_version, there's no specific address as it applies to the entire plan + outputs.append( + {"value": input_data.get("terraform_version"), "meta": input_data, "addresses": ["terraform_version"]} + ) def direct_dependencies_operator(input_data: dict, provider_inputs: dict, outputs: list): @@ -316,12 +343,16 @@ def direct_dependencies_operator(input_data: dict, provider_inputs: dict, output is_resource_found = False for resource in config_resources: - if resource.get("type") != resource_type: continue is_resource_found = True deps_resource_type = {resource_id.split(".")[0] for resource_id in resource.get("depends_on", [])} - outputs.append({"value": list(deps_resource_type), "meta": config_resources}) + result = {"value": list(deps_resource_type), "meta": config_resources} + # Add addresses if available + address = resource.get("address") + if address: + result["addresses"] = [address] + outputs.append(result) if not is_resource_found: outputs.append( @@ -386,13 +417,20 @@ def direct_references_operator_referenced_by(input_data: dict, provider_inputs: reference_address = f"{module_path}.{relative_reference_address}" if reference_address in reference_target_addresses: reference_target_addresses.remove(reference_address) - outputs.append( - {"value": True, "meta": {"address": reference_address, "referenced_by": resource_config}} - ) + result = { + "value": True, + "meta": {"referenced_by": resource_config}, + "addresses": [reference_address], + } + + outputs.append(result) # For all of the reference_target_addresses that don't have a reference - for reference_target_address in reference_target_addresses: - outputs.append({"value": False, "meta": {"address": reference_target_address, "referenced_by": {}}}) + for reference_target_addresses_item in reference_target_addresses: + result = {"value": False, "meta": {"referenced_by": {}}} + # Add addresses as a simple list + result["addresses"] = [reference_target_addresses_item] + outputs.append(result) def get_module_resources_by_type_recursive(module: dict, resource_type: str, current_module_path: str = "") -> iter: @@ -478,7 +516,11 @@ def direct_references_operator_references_to(input_data: dict, provider_inputs: return is_all_resource_type_references_to = resource_type_count == reference_count - outputs.append({"value": is_all_resource_type_references_to, "meta": config_resources}) + result = {"value": is_all_resource_type_references_to, "meta": config_resources} + # Simple list with resource type + # TODO: Use the real specific addresses + result["addresses"] = [resource_type] + outputs.append(result) def direct_references_operator(input_data: dict, provider_inputs: dict, outputs: list): @@ -513,7 +555,7 @@ def direct_references_operator(input_data: dict, provider_inputs: dict, outputs: return is_resource_found = False - + addresses = [] for resource in config_resources: if resource.get("type") != resource_type: @@ -529,8 +571,13 @@ def direct_references_operator(input_data: dict, provider_inputs: dict, outputs: for reference in expressions_val.get("references", []): # Only get the resource type resource_references.add(reference.split(".")[0]) + addresses.append(reference) + + result = {"value": list(resource_references), "meta": resource} - outputs.append({"value": list(resource_references), "meta": resource}) + if addresses: + result["addresses"] = addresses + outputs.append(result) if not is_resource_found: outputs.append(