diff --git a/cid/builtin/core/data/resources.yaml b/cid/builtin/core/data/resources.yaml index 431eac72..cb820019 100644 --- a/cid/builtin/core/data/resources.yaml +++ b/cid/builtin/core/data/resources.yaml @@ -267,7 +267,47 @@ views: riFile: cid/summary_view_ri.sql File: cid/summary_view.sql dependsOn: - cur: true + cur: + - bill_billing_entity + - bill_billing_period_start_date + - bill_invoice_id + - bill_payer_account_id + - line_item_availability_zone + - line_item_legal_entity + - line_item_line_item_description + - line_item_line_item_type + - line_item_operation + - line_item_product_code + - line_item_resource_id + - line_item_unblended_cost + - line_item_usage_account_id + - line_item_usage_amount + - line_item_usage_start_date + - line_item_usage_type + - pricing_public_on_demand_cost + - pricing_unit + - product_current_generation + - product_database_engine + - product_from_location + - product_group + - product_operating_system + - product_physical_processor + - product_processor_features + - product_product_family + - product_product_name + - product_region + - product_servicecode + - product_tenancy + - product_to_location + - reservation_effective_cost + - reservation_reservation_a_r_n + - reservation_unused_amortized_upfront_fee_for_billing_period + - reservation_unused_recurring_fee + - savings_plan_amortized_upfront_commitment_for_billing_period + - savings_plan_savings_plan_a_r_n + - savings_plan_savings_plan_effective_cost + - savings_plan_total_commitment_to_date + - savings_plan_used_commitment ec2_running_cost: spriFile: cid/ec2_running_cost_sp_ri.sql @@ -275,17 +315,52 @@ views: riFile: cid/ec2_running_cost_ri.sql File: cid/ec2_running_cost.sql dependsOn: - cur: true + cur: + - bill_billing_period_start_date + - bill_payer_account_id + - line_item_line_item_type + - line_item_unblended_cost + - line_item_usage_account_id + - line_item_usage_amount + - line_item_usage_start_date + - line_item_usage_type + - reservation_effective_cost + - reservation_reservation_a_r_n + - savings_plan_savings_plan_a_r_n + - savings_plan_savings_plan_effective_cost compute_savings_plan_eligible_spend: File: cid/compute_savings_plan_eligible_spend.sql dependsOn: cur: true + - bill_billing_period_start_date + - bill_payer_account_id + - line_item_line_item_type + - line_item_operation + - line_item_product_code + - line_item_unblended_cost + - line_item_usage_account_id + - line_item_usage_start_date + - line_item_usage_type + - product_servicecode s3_view: File: cid/s3.sql dependsOn: - cur: true + cur: + - bill_billing_period_start_date + - bill_payer_account_id + - line_item_line_item_type + - line_item_operation + - line_item_product_code + - line_item_resource_id + - line_item_unblended_cost + - line_item_usage_account_id + - line_item_usage_amount + - line_item_usage_start_date + - pricing_public_on_demand_cost + - pricing_unit + - product_region ri_sp_mapping: spriFile: cid/ri_sp_mapping_sp_ri.sql @@ -293,21 +368,85 @@ views: riFile: cid/ri_sp_mapping_ri.sql File: cid/ri_sp_mapping.sql dependsOn: - cur: true + cur: + - bill_billing_period_start_date + - bill_payer_account_id + - line_item_line_item_type + - pricing_lease_contract_length + - pricing_offering_class + - pricing_purchase_option + - reservation_reservation_a_r_n + - savings_plan_end_time + - savings_plan_savings_plan_a_r_n + hourly_view: spriFile: cudos/hourly_view_sp_ri.sql spFile: cudos/hourly_view_sp.sql riFile: cudos/hourly_view_ri.sql File: cudos/hourly_view.sql dependsOn: - cur: true + cur: + - bill_billing_period_start_date + - bill_payer_account_id + - line_item_line_item_description + - line_item_line_item_type + - line_item_operation + - line_item_product_code + - line_item_unblended_cost + - line_item_usage_account_id + - line_item_usage_amount + - line_item_usage_start_date + - line_item_usage_type + - pricing_term + - pricing_unit + - product_region + - product_servicecode + - reservation_effective_cost + - reservation_reservation_a_r_n + - savings_plan_savings_plan_a_r_n + - savings_plan_savings_plan_effective_cost + resource_view: spriFile: cudos/resource_view_sp_ri.sql spFile: cudos/resource_view_sp.sql riFile: cudos/resource_view_ri.sql File: cudos/resource_view.sql dependsOn: - cur: true + cur: + - bill_billing_entity + - bill_payer_account_id + - line_item_legal_entity + - line_item_line_item_description + - line_item_line_item_type + - line_item_operation + - line_item_product_code + - line_item_resource_id + - line_item_unblended_cost + - line_item_usage_account_id + - line_item_usage_amount + - line_item_usage_start_date + - line_item_usage_type + - pricing_term + - pricing_unit + - product_database_engine + - product_deployment_option + - product_from_location + - product_group + - product_instance_type + - product_instance_type_family + - product_operating_system + - product_product_family + - product_product_name + - product_region + - product_servicecode + - product_storage + - product_to_location + - product_volume_api_name + - reservation_effective_cost + - reservation_reservation_a_r_n + - savings_plan_savings_plan_a_r_n + - savings_plan_savings_plan_effective_cost + # Trends daily_anomaly_detection: File: trends/daily_anomaly_detection.sql diff --git a/cid/common.py b/cid/common.py index 5e4eeb36..de9af5e6 100644 --- a/cid/common.py +++ b/cid/common.py @@ -125,17 +125,18 @@ def cur(self) -> CUR: if not self._clients.get('cur'): _cur = CUR(self.base.session) _cur.athena = self.athena + _cur.glue = self.glue print('Checking if CUR is enabled and available...') - if not _cur.configured: + if not _cur.metadata: raise CidCritical("Error: please ensure CUR is enabled, if yes allow it some time to propagate") - print(f'\tAthena table: {_cur.tableName}') - print(f"\tResource IDs: {'yes' if _cur.hasResourceIDs else 'no'}") - if not _cur.hasResourceIDs: + print(f'\tAthena table: {_cur.table_name}') + print(f"\tResource IDs: {'yes' if _cur.has_resource_ids else 'no'}") + if not _cur.has_resource_ids: raise CidCritical("Error: CUR has to be created with Resource IDs") - print(f"\tSavingsPlans: {'yes' if _cur.hasSavingsPlans else 'no'}") - print(f"\tReserved Instances: {'yes' if _cur.hasReservations else 'no'}") + print(f"\tSavingsPlans: {'yes' if _cur.has_savings_plans else 'no'}") + print(f"\tReserved Instances: {'yes' if _cur.has_reservations else 'no'}") print('\n') self._clients.update({ 'cur': _cur @@ -1360,7 +1361,7 @@ def create_or_update_dataset(self, dataset_definition: dict, dataset_id: str=Non # Check for required views _views = dataset_definition.get('dependsOn', {}).get('views', []) - required_views = [(self.cur.tableName if cur_required and name =='${cur_table_name}' else name) for name in _views] + required_views = [(self.cur.table_name if cur_required and name =='${cur_table_name}' else name) for name in _views] self.athena.discover_views(required_views) found_views = utils.intersection(required_views, self.athena._metadata.keys()) @@ -1369,7 +1370,7 @@ def create_or_update_dataset(self, dataset_definition: dict, dataset_id: str=Non if recursive: print(f"Detected views: {', '.join(found_views)}") for view_name in found_views: - if cur_required and view_name == self.cur.tableName: + if cur_required and view_name == self.cur.table_name: logger.debug(f'Dependancy view {view_name} is a CUR. Skip.') continue if view_name == 'account_map': @@ -1388,7 +1389,7 @@ def create_or_update_dataset(self, dataset_definition: dict, dataset_id: str=Non columns_tpl = { 'athena_datasource_arn': athena_datasource.arn, 'athena_database_name': self.athena.DatabaseName, - 'cur_table_name': self.cur.tableName if cur_required else None + 'cur_table_name': self.cur.table_name if cur_required else None } logger.debug(f'dataset_id={dataset_id}') @@ -1472,12 +1473,13 @@ def create_or_update_dataset(self, dataset_definition: dict, dataset_id: str=Non def create_or_update_view(self, view_name: str, recursive: bool=True, update: bool=False) -> None: - # For account mappings create a view using a special helper - if view_name in self._visited_views: # avoid checking a views multiple times in one cid session + # Avoid checking a views multiple times in one cid session + if view_name in self._visited_views: return - logger.info(f'Processing view: {view_name}') self._visited_views.append(view_name) + logger.info(f'Processing view: {view_name}') + # For account mappings create a view using a special helper if view_name in ['account_map', 'aws_accounts']: if view_name in self.athena._metadata.keys(): print(f'Account map {view_name} exists. Skipping.') @@ -1495,10 +1497,20 @@ def create_or_update_view(self, view_name: str, recursive: bool=True, update: bo logger.info(f"Definition is unavailable {view_name}") return logger.debug(f'View definition: {view_definition}') + dependencies = view_definition.get('dependsOn', {}) + + # Process CUR columns + if isinstance(dependencies.get('cur'), list): + for column in dependencies.get('cur'): + self.cur.ensure_column(column) + elif isinstance(dependencies.get('cur'), dict): + for column, column_type in dependencies.get('cur').items(): + self.cur.ensure_column(column, column_type) if recursive: - dependency_views = view_definition.get('dependsOn', dict()).get('views', list()) - if 'cur' in dependency_views: dependency_views.remove('cur') + dependency_views = dependencies.get('views', []) + if 'cur' in dependency_views: + dependency_views.remove('cur') # Discover dependency views (may not be discovered earlier) self.athena.discover_views(dependency_views) logger.info(f"Dependency views: {', '.join(dependency_views)}" if dependency_views else 'No dependency views') @@ -1588,11 +1600,11 @@ def get_view_query(self, view_name: str) -> str: # View path view_definition = self.get_definition("view", name=view_name) cur_required = view_definition.get('dependsOn', dict()).get('cur') - if cur_required and self.cur.hasSavingsPlans and self.cur.hasReservations and view_definition.get('spriFile'): + if cur_required and self.cur.has_savings_plans and self.cur.has_reservations and view_definition.get('spriFile'): view_definition['File'] = view_definition.get('spriFile') - elif cur_required and self.cur.hasSavingsPlans and view_definition.get('spFile'): + elif cur_required and self.cur.has_savings_plans and view_definition.get('spFile'): view_definition['File'] = view_definition.get('spFile') - elif cur_required and self.cur.hasReservations and view_definition.get('riFile'): + elif cur_required and self.cur.has_reservations and view_definition.get('riFile'): view_definition['File'] = view_definition.get('riFile') elif view_definition.get('File') or view_definition.get('Data') or view_definition.get('data'): pass @@ -1609,7 +1621,7 @@ def get_view_query(self, view_name: str) -> str: # Prepare template parameters columns_tpl = { - 'cur_table_name': self.cur.tableName if cur_required else None, + 'cur_table_name': self.cur.table_name if cur_required else None, 'athenaTableName': view_name, 'athena_database_name': self.athena.DatabaseName, } diff --git a/cid/helpers/__init__.py b/cid/helpers/__init__.py index 89946c66..41103d4c 100644 --- a/cid/helpers/__init__.py +++ b/cid/helpers/__init__.py @@ -1,6 +1,6 @@ from cid.helpers.athena import Athena -from cid.helpers.cur import CUR from cid.helpers.glue import Glue +from cid.helpers.cur import CUR from cid.helpers.diff import diff from cid.helpers.quicksight import QuickSight, Dashboard, Dataset, Datasource, Template from cid.helpers.csv2view import csv2view diff --git a/cid/helpers/account_map.py b/cid/helpers/account_map.py index fae3a7c8..dd44176e 100644 --- a/cid/helpers/account_map.py +++ b/cid/helpers/account_map.py @@ -159,7 +159,7 @@ def create(self, name) -> bool: # Fill in TPLs columns_tpl = { 'metadata_table_name': self._athena_table_name, - 'cur_table_name': self.cur.tableName # only for trends + 'cur_table_name': self.cur.table_name # only for trends } for key, val in self.mappings.get(name).get(self._athena_table_name).items(): logger.debug(f'Mapping field {key} to {val}') @@ -185,7 +185,7 @@ def get_dummy_account_mapping_sql(self, name) -> list: ).decode('utf-8')) columns_tpl = { 'athena_view_name': name, - 'cur_table_name': self.cur.tableName + 'cur_table_name': self.cur.table_name } compiled_query = template.safe_substitute(columns_tpl) return compiled_query diff --git a/cid/helpers/cur.py b/cid/helpers/cur.py index 4cafda21..a1933803 100644 --- a/cid/helpers/cur.py +++ b/cid/helpers/cur.py @@ -1,8 +1,10 @@ +""" Manage AWS CUR +""" import json import logging from cid.base import CidBase -from cid.helpers import Athena +from cid.helpers import Athena, Glue from cid.utils import get_parameter, get_parameters from cid.exceptions import CidCritical @@ -10,7 +12,9 @@ class CUR(CidBase): - curRequiredColumns = [ + """ Manage AWS CUR + """ + cur_minimal_required_columns = [ 'bill_bill_type', 'bill_billing_entity', 'bill_billing_period_end_date', @@ -24,8 +28,6 @@ class CUR(CidBase): 'line_item_line_item_type', 'line_item_operation', 'line_item_product_code', - 'line_item_resource_id', - #'line_item_resource_id', 'line_item_unblended_cost', 'line_item_usage_account_id', 'line_item_usage_amount', @@ -34,22 +36,8 @@ class CUR(CidBase): 'line_item_usage_type', 'pricing_term', 'pricing_unit', - 'product_database_engine', - 'product_deployment_option', - 'product_from_location', - 'product_group', - 'product_instance_type', - 'product_instance_type_family', - 'product_operating_system', - 'product_product_family', - 'product_product_name', - 'product_region', - 'product_servicecode', - 'product_storage', - 'product_to_location', - 'product_volume_api_name', ] - riRequiredColumns = [ + ri_required_columns = [ 'reservation_reservation_a_r_n', 'reservation_effective_cost', 'reservation_start_time', @@ -58,7 +46,7 @@ class CUR(CidBase): 'pricing_offering_class', 'pricing_purchase_option' ] - spRequiredColumns = [ + sp_required_columns = [ 'savings_plan_savings_plan_a_r_n', 'savings_plan_savings_plan_effective_cost', 'savings_plan_start_time', @@ -67,85 +55,125 @@ class CUR(CidBase): 'savings_plan_offering_type', 'savings_plan_payment_option' ] - _tableName = None + _table_name = None _metadata = None - _clients = dict() - _hasResourceIDs = None - _hasSavingsPlans = None - _hasReservations = None - _configured = None - _status = str() + _clients = {} - def __init__(self, session) -> None: - super().__init__(session) - @property def athena(self) -> Athena: - if not self._clients.get('athena'): - self._clients.update({ - 'athena': Athena(self.session) - }) - return self._clients.get('athena') + """ Get Athena Client """ + if 'athena' not in self._clients: + self._clients['athena'] = Athena(self.session) + return self._clients['athena'] @athena.setter def athena(self, client) -> Athena: - if not self._clients.get('athena'): - self._clients.update({ - 'athena': client - }) - return self._clients.get('athena') + """ Set Athena Client """ + self._clients['athena'] = client + return self._clients['athena'] @property - def configured(self) -> bool: - """ Check if AWS Data Catalog and Athena database exist """ - if self._configured is None: - if self.athena.CatalogName and self.athena.DatabaseName: - self._configured = True - else: - self._configured = False - return self._configured + def glue(self) -> Glue: + """ Get Glue Client """ + if 'glue' not in self._clients: + self._clients['glue'] = Glue(self.session) + return self._clients['glue'] + + @glue.setter + def glue(self, client) -> Glue: + """ Set Glue client """ + self._clients['glue'] = client + return self._clients['glue'] @property - def tableName(self) -> str: + def table_name(self) -> str: + """ Get Athena table name """ if self.metadata is None: raise CidCritical('Error: Cannot detect any CUR table. Hint: Check if AWS Lake Formation is activated on your account, verify that the LakeFormationEnabled parameter is set to yes on the deployment stack') return self.metadata.get('Name') @property - def hasResourceIDs(self) -> bool: - if self._configured and self._hasResourceIDs is None: - self._hasResourceIDs = 'line_item_resource_id' in self.fields - return self._hasResourceIDs + def has_resource_ids(self) -> bool: + """ Return True if CUR has resource ids """ + return 'line_item_resource_id' in self.fields @property - def hasReservations(self) -> bool: - if self._configured and self._hasReservations is None: - logger.debug(f'{self.riRequiredColumns}: {[c in self.fields for c in self.riRequiredColumns]}') - self._hasReservations=all([c in self.fields for c in self.riRequiredColumns]) - logger.info(f'Reserved Instances: {self._hasReservations}') - return self._hasReservations + def has_reservations(self) -> bool: + """ Return True if CUR has reservation fields """ + return all(col in self.fields for col in self.ri_required_columns) @property - def hasSavingsPlans(self) -> bool: - if self._configured and self._hasSavingsPlans is None: - logger.debug(f'{self.spRequiredColumns}: {[c in self.fields for c in self.spRequiredColumns]}') - self._hasSavingsPlans=all([c in self.fields for c in self.spRequiredColumns]) - logger.info(f'Savings Plans: {self._hasSavingsPlans}') - return self._hasSavingsPlans - + def has_savings_plans(self) -> bool: + """ Return True if CUR has savings plan """ + return all(col in self.fields for col in self.sp_required_columns) + + def get_type_of_column(self, column: str): + """ Return an Athena type of a given non existent CUR column """ + if column.startswith('cost_category_') or column.startswith('resource_tags_'): + return 'STRING' + for ending in ['_cost', '_factor', '_quantity', '_fee', '_amount', '_discount']: + if column.endswith(ending): + return 'DOUBLE' + if column.endswith('_date') and not column.endswith('_to_date'): + return 'TIMESTAMP' + special_cases = { + "reservation_amortized_upfront_cost_for_usage": "DOUBLE", + "reservation_amortized_upfront_fee_for_billing_period": "DOUBLE", + "reservation_recurring_fee_for_usage": "DOUBLE", + "reservation_unused_amortized_upfront_fee_for_billing_period": "DOUBLE", + "reservation_upfront_value": "DOUBLE", + "reservation_net_amortized_upfront_cost_for_usage": "DOUBLE", + "reservation_net_amortized_upfront_fee_for_billing_period": "DOUBLE", + "reservation_net_recurring_fee_for_usage": "DOUBLE", + "reservation_net_unused_amortized_upfront_fee_for_billing_period": "DOUBLE", + "reservation_net_upfront_value": "DOUBLE", + "savings_plan_total_commitment_to_date": "DOUBLE", + "savings_plan_savings_plan_rate": "DOUBLE", + "savings_plan_used_commitment": "DOUBLE", + "savings_plan_amortized_upfront_commitment_for_billing_period": "DOUBLE", + "savings_plan_net_amortized_upfront_commitment_for_billing_period": "DOUBLE", + "savings_plan_recurring_commitment_for_billing_period": "DOUBLE", + } + return special_cases.get(column, 'STRING') + + def ensure_column(self, column: str, column_type: str=None): + """ Ensure column is in the cur. If it is not there - add column """ + column = column.lower() + if column in [col.get('Name', '').lower() for col in self.metadata.get('Columns', [])]: + return + + crawler_name = self.metadata.get('Parameters', {}).get('UPDATED_BY_CRAWLER') + if crawler_name: + # Check Crawler Behavior - if it does not have a Configuration/CrawlerOutput/TablesAddOrUpdateBehavior == MergeNewColumns, it will override columns + try: + crawler = self.glue.get_crawler(crawler_name) + except self.glue.client.exceptions.ClientError as exc: + raise CidCritical(f'Column {column} is not found in CUR ({self.table_name}). And we were unable to check if crawler {crawler_name} is configured to override columns.') from exc + config = json.loads(crawler.get('Configuration', '{}')) + add_or_update = config.get('CrawlerOutput', {}).get('Tables', {}).get('AddOrUpdateBehavior') + if add_or_update != 'MergeNewColumns': + raise CidCritical(f'Column {column} is not found in CUR ({self.table_name}). And we were unable to add it as crawler {crawler_name} is configured to override columns. Change crawler settings and run again.') + + column_type = column_type or self.get_type_of_column(column) + try: + self.athena.query(f'ALTER TABLE {self.table_name} ADD COLUMNS ({column} {column_type})') + except (self.athena.client.exceptions.ClientError, CidCritical) as exc: + raise CidCritical(f'Column {column} is not found in CUR and we were unable to add it.') from exc + self._metadata = self.athena.get_table_metadata(self.table_name) # refresh table metadata + logger.critical(f"Column '{column}' was added to CUR ({self.table_name}).") def table_is_cur(self, table: dict=None, name: str=None, return_reason: bool=False) -> bool: """ return True if table metadata fits CUR definition. """ try: table = table or self.athena.get_table_metadata(name) - except Exception as exc: + except Exception as exc: #pylint: disable=broad-exception-caught logger.debug(exc) return False if not return_reason else (False, f'cannot get table {name}. {exc}.') table_name = table.get('Name') - columns = [cols.get('Name') for cols in table.get('Columns')] - missing_columns = [col for col in self.curRequiredColumns if col not in columns] + columns = [col.get('Name') for col in table.get('Columns')] + missing_columns = [col for col in self.cur_minimal_required_columns if col not in columns] if missing_columns: return False if not return_reason else (False, f"Table {table_name} does not contain columns: {','.join(missing_columns)}. You can try ALTER TABLE {table_name} ADD COLUMNS (missing_column string).") @@ -153,18 +181,19 @@ def table_is_cur(self, table: dict=None, name: str=None, return_reason: bool=Fal @property def metadata(self) -> dict: + """get Athena metadata for the table of CUR """ if self._metadata: return self._metadata if get_parameters().get('cur-table-name'): - self._tableName = get_parameters().get('cur-table-name') + self._table_name = get_parameters().get('cur-table-name') try: - self._metadata = self.athena.get_table_metadata(self._tableName) + self._metadata = self.athena.get_table_metadata(self._table_name) except self.athena.client.exceptions.ResourceNotFoundException as exc: - raise CidCritical('Provided cur-table-name "{self._tableName}" is not found. Please make sure the table exists.') from exc + raise CidCritical('Provided cur-table-name "{self._table_name}" is not found. Please make sure the table exists.') from exc res, message = self.table_is_cur(table=self._metadata, return_reason=True) if not res: - raise CidCritical(f'Table {self._tableName} does not look like CUR. {message}') + raise CidCritical(f'Table {self._table_name} does not look like CUR. {message}') else: # Look all tables and filter ones with CUR fields all_tables = self.athena.list_table_metadata() @@ -176,23 +205,24 @@ def metadata(self) -> dict: cur_tables = [tab for tab in all_tables if self.table_is_cur(table=tab)] if not cur_tables: - raise CidCritical(f'CUR table not found. (scanned {len(all_tables)} tables in Athena Database {self.athena.DatabaseName} in {self.athena.region}). But none has required fields: {self.curRequiredColumns}.') + raise CidCritical(f'CUR table not found. (scanned {len(all_tables)} tables in Athena Database {self.athena.DatabaseName} in {self.athena.region}). But none has required fields: {self.cur_minimal_required_columns}.') if len(cur_tables) == 1: self._metadata = cur_tables[0] - self._tableName = self._metadata.get('Name') - logger.info('1 CUR table found: %s', self._tableName) + self._table_name = self._metadata.get('Name') + logger.info('1 CUR table found: %s', self._table_name) elif len(cur_tables) > 1: - self._tableName = get_parameter( + self._table_name = get_parameter( param_name='cur-table-name', message="Multiple CUR tables found, please select one", choices=sorted([v.get('Name') for v in cur_tables], reverse=True), ) - self._metadata = self.athena.get_table_metadata(self._tableName) + self._metadata = self.athena.get_table_metadata(self._table_name) return self._metadata @property def fields(self) -> list: - return [v.get('Name') for v in self.metadata.get('Columns', list())] + """get CUR fields """ + return [col.get('Name') for col in self.metadata.get('Columns', [])] @property def tag_and_cost_category_fields(self) -> list: diff --git a/cid/helpers/glue.py b/cid/helpers/glue.py index ecb9353e..d6fdcc14 100644 --- a/cid/helpers/glue.py +++ b/cid/helpers/glue.py @@ -32,3 +32,8 @@ def delete_table(self, name, catalog, database): ) except self.client.exceptions.EntityNotFoundException: return True + + + def get_crawler(self, name: str): + """ GetCrawler """ + return self.client.get_crawler(Name=name)['Crawler']