From 14929cbcd9e77393586c20aa2886f371aa6e3932 Mon Sep 17 00:00:00 2001 From: Iakov Gan Date: Sat, 23 Dec 2023 19:12:38 +0100 Subject: [PATCH 01/18] add management of CUR fields --- cid/builtin/core/data/resources.yaml | 127 ++++++++++++++++++++++++++- cid/common.py | 8 ++ cid/helpers/cur.py | 54 +++++++----- 3 files changed, 162 insertions(+), 27 deletions(-) diff --git a/cid/builtin/core/data/resources.yaml b/cid/builtin/core/data/resources.yaml index 431eac72..39f07c2d 100644 --- a/cid/builtin/core/data/resources.yaml +++ b/cid/builtin/core/data/resources.yaml @@ -267,7 +267,49 @@ views: riFile: cid/summary_view_ri.sql File: cid/summary_view.sql dependsOn: - cur: true + cur: + - bill_billing_period_start_date + - line_item_usage_start_date + - bill_payer_account_id + - line_item_usage_account_id + - bill_invoice_id + - line_item_line_item_type + - savings_plan_savings_plan_a_r_n + - reservation_reservation_a_r_n + - line_item_usage_type + - purchase_option + - line_item_product_code + - product_product_name + - product_servicecode + - product_product_family + - line_item_operation + - line_item_line_item_description + - line_item_availability_zone + - product_region + - product_operating_system + - product_tenancy + - product_physical_processor + - product_processor_features + - product_database_engine + - product_group + - product_from_location + - product_to_location + - product_current_generation + - line_item_legal_entity + - bill_billing_entity + - pricing_unit + - Line_item_resource_id + - line_item_usage_amount + - line_item_unblended_cost + - savings_plan_savings_plan_effective_cost + - savings_plan_total_commitment_to_date + - savings_plan_used_commitment + - reservation_effective_cost + - reservation_unused_amortized_upfront_fee_for_billing_period + - reservation_unused_recurring_fee + - pricing_public_on_demand_cost + - savings_plan_amortized_upfront_commitment_for_billing_period + ec2_running_cost: spriFile: cid/ec2_running_cost_sp_ri.sql @@ -286,6 +328,19 @@ views: File: cid/s3.sql dependsOn: cur: true + - bill_billing_period_start_date + - line_item_usage_start_date + - bill_payer_account_id + - line_item_usage_account_id + - line_item_resource_id + - line_item_product_code + - line_item_operation + - product_region + - line_item_line_item_type + - pricing_unit + - line_item_usage_amount + - line_item_unblended_cost + - pricing_public_on_demand_cost ri_sp_mapping: spriFile: cid/ri_sp_mapping_sp_ri.sql @@ -293,21 +348,85 @@ views: riFile: cid/ri_sp_mapping_ri.sql File: cid/ri_sp_mapping.sql dependsOn: - cur: true + cur: + - bill_billing_period_start_date + - bill_payer_account_id + - savings_plan_savings_plan_a_r_n + - savings_plan_end_time + - reservation_reservation_a_r_n + - pricing_lease_contract_length + - pricing_offering_class + - pricing_purchase_option + - line_item_line_item_type + hourly_view: spriFile: cudos/hourly_view_sp_ri.sql spFile: cudos/hourly_view_sp.sql riFile: cudos/hourly_view_ri.sql File: cudos/hourly_view.sql dependsOn: - cur: true + cur: + - line_item_product_code + - product_servicecode + - line_item_operation + - line_item_line_item_type + - line_item_usage_type + - line_item_line_item_description + - pricing_unit + - product_region + - pricing_term + - bill_billing_period_start_date + - line_item_usage_start_date + - bill_payer_account_id + - line_item_usage_account_id + - savings_plan_savings_plan_a_r_n + - reservation_reservation_a_r_n + - line_item_unblended_cost + - reservation_effective_cost + - savings_plan_savings_plan_effective_cost + - line_item_usage_amount + resource_view: spriFile: cudos/resource_view_sp_ri.sql spFile: cudos/resource_view_sp.sql riFile: cudos/resource_view_ri.sql File: cudos/resource_view.sql dependsOn: - cur: true + cur: + - line_item_usage_start_date + - bill_payer_account_id + - line_item_usage_account_id + - bill_billing_entity + - product_product_name + - line_item_resource_id + - line_item_product_code + - line_item_operation + - line_item_line_item_type + - line_item_usage_type + - pricing_unit + - product_region + - line_item_line_item_description + - line_item_legal_entity + - pricing_term + - product_database_engine + - product_deployment_option + - product_from_location + - product_group + - product_instance_type + - product_instance_type_family + - product_operating_system + - product_product_family + - product_servicecode + - product_storage + - product_to_location + - product_volume_api_name + - reservation_reservation_a_r_n + - savings_plan_savings_plan_a_r_n + - savings_plan_savings_plan_effective_cost + - reservation_effective_cost + - line_item_usage_amount" + - line_item_unblended_cost + # Trends daily_anomaly_detection: File: trends/daily_anomaly_detection.sql diff --git a/cid/common.py b/cid/common.py index 5e4eeb36..8f1323af 100644 --- a/cid/common.py +++ b/cid/common.py @@ -1485,6 +1485,14 @@ def create_or_update_view(self, view_name: str, recursive: bool=True, update: bo self.accountMap.create(view_name) #FIXME: add or_update return + # Process CUR columns + if isinstance(dependency_views.get('cur'), list): + for column in dependency_views.get('cur'): + self.cur.ensure_column(column) + elif isinstance(dependency_views.get('cur'), dict): + for column, column_type in dependency_views.get('cur').items(): + self.cur.ensure_column(column, column_type) + # Create a view logger.info(f'Getting view definition {view_name}') view_definition = self.get_definition("view", name=view_name) diff --git a/cid/helpers/cur.py b/cid/helpers/cur.py index 4cafda21..a10a7ae1 100644 --- a/cid/helpers/cur.py +++ b/cid/helpers/cur.py @@ -10,7 +10,7 @@ class CUR(CidBase): - curRequiredColumns = [ + cur_minimal_required_columns = [ 'bill_bill_type', 'bill_billing_entity', 'bill_billing_period_end_date', @@ -34,22 +34,22 @@ class CUR(CidBase): 'line_item_usage_type', 'pricing_term', 'pricing_unit', - 'product_database_engine', - 'product_deployment_option', - 'product_from_location', - 'product_group', - 'product_instance_type', - 'product_instance_type_family', - 'product_operating_system', - 'product_product_family', - 'product_product_name', - 'product_region', - 'product_servicecode', - 'product_storage', - 'product_to_location', - 'product_volume_api_name', + # 'product_database_engine', + # 'product_deployment_option', + # 'product_from_location', + # 'product_group', + # 'product_instance_type', + # 'product_instance_type_family', + # 'product_operating_system', + # 'product_product_family', + # 'product_product_name', + # 'product_region', + # 'product_servicecode', + # 'product_storage', + # 'product_to_location', + # 'product_volume_api_name', ] - riRequiredColumns = [ + ri_required_columns = [ 'reservation_reservation_a_r_n', 'reservation_effective_cost', 'reservation_start_time', @@ -58,7 +58,7 @@ class CUR(CidBase): 'pricing_offering_class', 'pricing_purchase_option' ] - spRequiredColumns = [ + sp_required_columns = [ 'savings_plan_savings_plan_a_r_n', 'savings_plan_savings_plan_effective_cost', 'savings_plan_start_time', @@ -121,19 +121,27 @@ def hasResourceIDs(self) -> bool: @property def hasReservations(self) -> bool: if self._configured and self._hasReservations is None: - logger.debug(f'{self.riRequiredColumns}: {[c in self.fields for c in self.riRequiredColumns]}') - self._hasReservations=all([c in self.fields for c in self.riRequiredColumns]) + logger.debug(f'{self.ri_required_columns}: {[c in self.fields for c in self.ri_required_columns]}') + self._hasReservations=all([c in self.fields for c in self.ri_required_columns]) logger.info(f'Reserved Instances: {self._hasReservations}') return self._hasReservations @property def hasSavingsPlans(self) -> bool: if self._configured and self._hasSavingsPlans is None: - logger.debug(f'{self.spRequiredColumns}: {[c in self.fields for c in self.spRequiredColumns]}') - self._hasSavingsPlans=all([c in self.fields for c in self.spRequiredColumns]) + logger.debug(f'{self.sp_required_columns}: {[c in self.fields for c in self.sp_required_columns]}') + self._hasSavingsPlans=all([c in self.fields for c in self.sp_required_columns]) logger.info(f'Savings Plans: {self._hasSavingsPlans}') return self._hasSavingsPlans + def ensure_column(self, column: str, column_type: str='STRING'): + if column in [col.get('Name') for col in self.metadata.get('Columns', [])]: + # TODO: check type + return + # TODO: check if crawler will override this column + # TODO: ask user? + self.athena.query(f'ALTER TABLE {self._tableName} ADD COLUMNS ({column} {column_type})') + self._metadata = self.athena.get_table_metadata(self._tableName) # refresh table metadata def table_is_cur(self, table: dict=None, name: str=None, return_reason: bool=False) -> bool: """ return True if table metadata fits CUR definition. """ @@ -145,7 +153,7 @@ def table_is_cur(self, table: dict=None, name: str=None, return_reason: bool=Fal table_name = table.get('Name') columns = [cols.get('Name') for cols in table.get('Columns')] - missing_columns = [col for col in self.curRequiredColumns if col not in columns] + missing_columns = [col for col in self.cur_minimal_required_columns if col not in columns] if missing_columns: return False if not return_reason else (False, f"Table {table_name} does not contain columns: {','.join(missing_columns)}. You can try ALTER TABLE {table_name} ADD COLUMNS (missing_column string).") @@ -176,7 +184,7 @@ def metadata(self) -> dict: cur_tables = [tab for tab in all_tables if self.table_is_cur(table=tab)] if not cur_tables: - raise CidCritical(f'CUR table not found. (scanned {len(all_tables)} tables in Athena Database {self.athena.DatabaseName} in {self.athena.region}). But none has required fields: {self.curRequiredColumns}.') + raise CidCritical(f'CUR table not found. (scanned {len(all_tables)} tables in Athena Database {self.athena.DatabaseName} in {self.athena.region}). But none has required fields: {self.cur_minimal_required_columns}.') if len(cur_tables) == 1: self._metadata = cur_tables[0] self._tableName = self._metadata.get('Name') From d05715696aace51cccbad096db9885da3b4112c4 Mon Sep 17 00:00:00 2001 From: Iakov Gan Date: Sat, 23 Dec 2023 19:25:46 +0100 Subject: [PATCH 02/18] wip --- cid/builtin/core/data/resources.yaml | 29 ++++++++++++++++++++++++---- cid/common.py | 10 ++++++---- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/cid/builtin/core/data/resources.yaml b/cid/builtin/core/data/resources.yaml index 39f07c2d..7db7fe5a 100644 --- a/cid/builtin/core/data/resources.yaml +++ b/cid/builtin/core/data/resources.yaml @@ -310,24 +310,45 @@ views: - pricing_public_on_demand_cost - savings_plan_amortized_upfront_commitment_for_billing_period - ec2_running_cost: spriFile: cid/ec2_running_cost_sp_ri.sql spFile: cid/ec2_running_cost_sp.sql riFile: cid/ec2_running_cost_ri.sql File: cid/ec2_running_cost.sql dependsOn: - cur: true + cur: + - bill_billing_period_start_date + - line_item_usage_start_date + - bill_payer_account_id + - line_item_usage_account_id + - line_item_usage_type + - reservation_reservation_a_r_n + - savings_plan_savings_plan_a_r_n + - line_item_line_item_type + - savings_plan_savings_plan_effective_cost + - reservation_effective_cost + - line_item_unblended_cost + - line_item_usage_amount compute_savings_plan_eligible_spend: File: cid/compute_savings_plan_eligible_spend.sql dependsOn: cur: true + - bill_payer_account_id + - line_item_usage_account_id + - bill_billing_period_start_date + - line_item_usage_start_date + - line_item_operation + - line_item_usage_type + - product_servicecode + - line_item_line_item_type + - line_item_product_code + - line_item_unblended_cost s3_view: File: cid/s3.sql dependsOn: - cur: true + cur: - bill_billing_period_start_date - line_item_usage_start_date - bill_payer_account_id @@ -424,7 +445,7 @@ views: - savings_plan_savings_plan_a_r_n - savings_plan_savings_plan_effective_cost - reservation_effective_cost - - line_item_usage_amount" + - line_item_usage_amount - line_item_unblended_cost # Trends diff --git a/cid/common.py b/cid/common.py index 8f1323af..8e8f6883 100644 --- a/cid/common.py +++ b/cid/common.py @@ -1472,12 +1472,13 @@ def create_or_update_dataset(self, dataset_definition: dict, dataset_id: str=Non def create_or_update_view(self, view_name: str, recursive: bool=True, update: bool=False) -> None: - # For account mappings create a view using a special helper - if view_name in self._visited_views: # avoid checking a views multiple times in one cid session + # Avoid checking a views multiple times in one cid session + if view_name in self._visited_views: return - logger.info(f'Processing view: {view_name}') self._visited_views.append(view_name) + logger.info(f'Processing view: {view_name}') + # For account mappings create a view using a special helper if view_name in ['account_map', 'aws_accounts']: if view_name in self.athena._metadata.keys(): print(f'Account map {view_name} exists. Skipping.') @@ -1506,7 +1507,8 @@ def create_or_update_view(self, view_name: str, recursive: bool=True, update: bo if recursive: dependency_views = view_definition.get('dependsOn', dict()).get('views', list()) - if 'cur' in dependency_views: dependency_views.remove('cur') + if 'cur' in dependency_views: + dependency_views.remove('cur') # Discover dependency views (may not be discovered earlier) self.athena.discover_views(dependency_views) logger.info(f"Dependency views: {', '.join(dependency_views)}" if dependency_views else 'No dependency views') From 3423e1f15aca0f8ee85429f01596fc25c9e106ec Mon Sep 17 00:00:00 2001 From: Iakov Gan Date: Sat, 23 Dec 2023 20:47:36 +0100 Subject: [PATCH 03/18] better type management --- cid/helpers/cur.py | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/cid/helpers/cur.py b/cid/helpers/cur.py index a10a7ae1..248df277 100644 --- a/cid/helpers/cur.py +++ b/cid/helpers/cur.py @@ -134,12 +134,36 @@ def hasSavingsPlans(self) -> bool: logger.info(f'Savings Plans: {self._hasSavingsPlans}') return self._hasSavingsPlans - def ensure_column(self, column: str, column_type: str='STRING'): + def get_type_of_column(self, column: str): + """ Return an Athena type of a given non existent CUR column """ + # TODO: probably there is a better way to determine the column type in CUR. Pricing API? Full list of columns? + for ending in ['_cost', '_factor', '_quantity', '_fee']: + if column.endswith(ending): + return 'FLOAT' + if column.endswith('_date') and not column.endswith('_to_date'): + return 'DATE' + SPECIAL = { + "reservation_amortized_upfront_cost_for_usage": "FLOAT", + "reservation_amortized_upfront_fee_for_billing_period": "FLOAT", + "reservation_recurring_fee_for_usage": "FLOAT", + "reservation_unused_amortized_upfront_fee_for_billing_period": "FLOAT", + "reservation_upfront_value": "FLOAT", + "savings_plan_total_commitment_to_date": "FLOAT", + "savings_plan_savings_plan_rate": "FLOAT", + "savings_plan_used_commitment": "FLOAT", + "savings_plan_amortized_upfront_commitment_for_billing_period": "FLOAT", + "savings_plan_recurring_commitment_for_billing_period": "FLOAT", + } + return SPECIAL.get(column, 'STRING') + + def ensure_column(self, column: str, column_type: str=None): + """ Ensure column is in the cur. If it is not there - add column """ if column in [col.get('Name') for col in self.metadata.get('Columns', [])]: - # TODO: check type return + column_type = column_type or self.get_type_of_column(column) # TODO: check if crawler will override this column # TODO: ask user? + self.metadata.get('Properties') self.athena.query(f'ALTER TABLE {self._tableName} ADD COLUMNS ({column} {column_type})') self._metadata = self.athena.get_table_metadata(self._tableName) # refresh table metadata From b23941a3dcfab57e46ce3667d5d162efa35aba5b Mon Sep 17 00:00:00 2001 From: Iakov Gan Date: Sat, 23 Dec 2023 22:14:11 +0100 Subject: [PATCH 04/18] refactoring --- cid/helpers/cur.py | 44 ++++++++++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/cid/helpers/cur.py b/cid/helpers/cur.py index 248df277..b54a0b96 100644 --- a/cid/helpers/cur.py +++ b/cid/helpers/cur.py @@ -136,23 +136,30 @@ def hasSavingsPlans(self) -> bool: def get_type_of_column(self, column: str): """ Return an Athena type of a given non existent CUR column """ - # TODO: probably there is a better way to determine the column type in CUR. Pricing API? Full list of columns? - for ending in ['_cost', '_factor', '_quantity', '_fee']: + if column.startswith('cost_category_') or column.startswith('resource_tags_'): + return 'STRING' + for ending in ['_cost', '_factor', '_quantity', '_fee', '_amount', '_discount']: if column.endswith(ending): - return 'FLOAT' + return 'DOUBLE' if column.endswith('_date') and not column.endswith('_to_date'): - return 'DATE' + return 'TIMESTAMP' SPECIAL = { - "reservation_amortized_upfront_cost_for_usage": "FLOAT", - "reservation_amortized_upfront_fee_for_billing_period": "FLOAT", - "reservation_recurring_fee_for_usage": "FLOAT", - "reservation_unused_amortized_upfront_fee_for_billing_period": "FLOAT", - "reservation_upfront_value": "FLOAT", - "savings_plan_total_commitment_to_date": "FLOAT", - "savings_plan_savings_plan_rate": "FLOAT", - "savings_plan_used_commitment": "FLOAT", - "savings_plan_amortized_upfront_commitment_for_billing_period": "FLOAT", - "savings_plan_recurring_commitment_for_billing_period": "FLOAT", + "reservation_amortized_upfront_cost_for_usage": "DOUBLE", + "reservation_amortized_upfront_fee_for_billing_period": "DOUBLE", + "reservation_recurring_fee_for_usage": "DOUBLE", + "reservation_unused_amortized_upfront_fee_for_billing_period": "DOUBLE", + "reservation_upfront_value": "DOUBLE", + "reservation_net_amortized_upfront_cost_for_usage": "DOUBLE", + "reservation_net_amortized_upfront_fee_for_billing_period": "DOUBLE", + "reservation_net_recurring_fee_for_usage": "DOUBLE", + "reservation_net_unused_amortized_upfront_fee_for_billing_period": "DOUBLE", + "reservation_net_upfront_value": "DOUBLE", + "savings_plan_total_commitment_to_date": "DOUBLE", + "savings_plan_savings_plan_rate": "DOUBLE", + "savings_plan_used_commitment": "DOUBLE", + "savings_plan_amortized_upfront_commitment_for_billing_period": "DOUBLE", + "savings_plan_net_amortized_upfront_commitment_for_billing_period": "DOUBLE", + "savings_plan_recurring_commitment_for_billing_period": "DOUBLE", } return SPECIAL.get(column, 'STRING') @@ -161,10 +168,11 @@ def ensure_column(self, column: str, column_type: str=None): if column in [col.get('Name') for col in self.metadata.get('Columns', [])]: return column_type = column_type or self.get_type_of_column(column) - # TODO: check if crawler will override this column - # TODO: ask user? - self.metadata.get('Properties') - self.athena.query(f'ALTER TABLE {self._tableName} ADD COLUMNS ({column} {column_type})') + try: + self.athena.query(f'ALTER TABLE {self._tableName} ADD COLUMNS ({column} {column_type})') + except self.athena.client.exceptions.ClientError as exc: + raise CidCritical(f'Column {column} is not found in CUR and we were unable to add it. Please check FAQ.') from exc + logger.critical(f'Column {column} was added to CUR. Please make sure crawler do not override that columns.') self._metadata = self.athena.get_table_metadata(self._tableName) # refresh table metadata def table_is_cur(self, table: dict=None, name: str=None, return_reason: bool=False) -> bool: From c6caf789c6088186a2a130cd7dab8a834106dd4f Mon Sep 17 00:00:00 2001 From: Iakov Gan Date: Sat, 23 Dec 2023 22:48:18 +0100 Subject: [PATCH 05/18] fixes --- cid/common.py | 19 ++++++++++--------- cid/helpers/cur.py | 7 ++++--- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/cid/common.py b/cid/common.py index 8e8f6883..0e46b920 100644 --- a/cid/common.py +++ b/cid/common.py @@ -1486,14 +1486,6 @@ def create_or_update_view(self, view_name: str, recursive: bool=True, update: bo self.accountMap.create(view_name) #FIXME: add or_update return - # Process CUR columns - if isinstance(dependency_views.get('cur'), list): - for column in dependency_views.get('cur'): - self.cur.ensure_column(column) - elif isinstance(dependency_views.get('cur'), dict): - for column, column_type in dependency_views.get('cur').items(): - self.cur.ensure_column(column, column_type) - # Create a view logger.info(f'Getting view definition {view_name}') view_definition = self.get_definition("view", name=view_name) @@ -1504,9 +1496,18 @@ def create_or_update_view(self, view_name: str, recursive: bool=True, update: bo logger.info(f"Definition is unavailable {view_name}") return logger.debug(f'View definition: {view_definition}') + dependencies = view_definition.get('dependsOn', {}) + + # Process CUR columns + if isinstance(dependencies.get('cur'), list): + for column in dependencies.get('cur'): + self.cur.ensure_column(column) + elif isinstance(dependencies.get('cur'), dict): + for column, column_type in dependencies.get('cur').items(): + self.cur.ensure_column(column, column_type) if recursive: - dependency_views = view_definition.get('dependsOn', dict()).get('views', list()) + dependency_views = dependencies.get('views', []) if 'cur' in dependency_views: dependency_views.remove('cur') # Discover dependency views (may not be discovered earlier) diff --git a/cid/helpers/cur.py b/cid/helpers/cur.py index b54a0b96..794445dd 100644 --- a/cid/helpers/cur.py +++ b/cid/helpers/cur.py @@ -165,14 +165,15 @@ def get_type_of_column(self, column: str): def ensure_column(self, column: str, column_type: str=None): """ Ensure column is in the cur. If it is not there - add column """ - if column in [col.get('Name') for col in self.metadata.get('Columns', [])]: + column = column.lower() + if column in [col.get('Name', '').lower() for col in self.metadata.get('Columns', [])]: return column_type = column_type or self.get_type_of_column(column) try: self.athena.query(f'ALTER TABLE {self._tableName} ADD COLUMNS ({column} {column_type})') - except self.athena.client.exceptions.ClientError as exc: + except (self.athena.client.exceptions.ClientError, CidCritical) as exc: raise CidCritical(f'Column {column} is not found in CUR and we were unable to add it. Please check FAQ.') from exc - logger.critical(f'Column {column} was added to CUR. Please make sure crawler do not override that columns.') + logger.critical(f"Column '{column}' was added to CUR ({self._tableName}). Please make sure crawler do not override that columns.") self._metadata = self.athena.get_table_metadata(self._tableName) # refresh table metadata def table_is_cur(self, table: dict=None, name: str=None, return_reason: bool=False) -> bool: From d0b85786ba7217a1ca5634fb11cd058c38c5c8c3 Mon Sep 17 00:00:00 2001 From: Iakov Gan Date: Sat, 23 Dec 2023 23:17:45 +0100 Subject: [PATCH 06/18] add warning if crawler is not well configured --- cid/common.py | 1 + cid/helpers/cur.py | 25 +++++++++++++++++++++++-- cid/helpers/glue.py | 5 +++++ 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/cid/common.py b/cid/common.py index 0e46b920..b5a61358 100644 --- a/cid/common.py +++ b/cid/common.py @@ -125,6 +125,7 @@ def cur(self) -> CUR: if not self._clients.get('cur'): _cur = CUR(self.base.session) _cur.athena = self.athena + _cur.glue = self.glue print('Checking if CUR is enabled and available...') if not _cur.configured: diff --git a/cid/helpers/cur.py b/cid/helpers/cur.py index 794445dd..a18ff28d 100644 --- a/cid/helpers/cur.py +++ b/cid/helpers/cur.py @@ -2,7 +2,7 @@ import logging from cid.base import CidBase -from cid.helpers import Athena +from cid.helpers import Athena, Glue from cid.utils import get_parameter, get_parameters from cid.exceptions import CidCritical @@ -96,6 +96,18 @@ def athena(self, client) -> Athena: }) return self._clients.get('athena') + @property + def glue(self) -> Glue: + if not self._clients.get('glue'): + self._clients['glue'] = Glue(self.session) + return self._clients.get('glue') + + @glue.setter + def glue(self, client) -> Glue: + if not self._clients.get('glue'): + self._clients['glue'] = client + return self._clients.get('glue') + @property def configured(self) -> bool: """ Check if AWS Data Catalog and Athena database exist """ @@ -168,13 +180,22 @@ def ensure_column(self, column: str, column_type: str=None): column = column.lower() if column in [col.get('Name', '').lower() for col in self.metadata.get('Columns', [])]: return + + crawler_name = self.metadata.get('Parameters', {}).get('UPDATED_BY_CRAWLER') + if crawler_name: + # Check Crawler Behavior - if it does not have a Configuration/CrawlerOutput/TablesAddOrUpdateBehavior == MergeNewColumns, it will override columns + config = json.loads(self.glue.get_crawler(crawler_name).get('Configuration', '{}')) + add_or_update = config.get('CrawlerOutput', {}).get('Tables', {}).get('AddOrUpdateBehavior') + if add_or_update != 'MergeNewColumns': + raise CidCritical(f'Column {column} is not found in CUR. And we were unable to add it as crawler {crawler_name} is configured to override columns.') + column_type = column_type or self.get_type_of_column(column) try: self.athena.query(f'ALTER TABLE {self._tableName} ADD COLUMNS ({column} {column_type})') except (self.athena.client.exceptions.ClientError, CidCritical) as exc: raise CidCritical(f'Column {column} is not found in CUR and we were unable to add it. Please check FAQ.') from exc - logger.critical(f"Column '{column}' was added to CUR ({self._tableName}). Please make sure crawler do not override that columns.") self._metadata = self.athena.get_table_metadata(self._tableName) # refresh table metadata + logger.critical(f"Column '{column}' was added to CUR ({self._tableName}). Please make sure crawler do not override that columns. Crawler='{crawler_name}'") def table_is_cur(self, table: dict=None, name: str=None, return_reason: bool=False) -> bool: """ return True if table metadata fits CUR definition. """ diff --git a/cid/helpers/glue.py b/cid/helpers/glue.py index ecb9353e..d6fdcc14 100644 --- a/cid/helpers/glue.py +++ b/cid/helpers/glue.py @@ -32,3 +32,8 @@ def delete_table(self, name, catalog, database): ) except self.client.exceptions.EntityNotFoundException: return True + + + def get_crawler(self, name: str): + """ GetCrawler """ + return self.client.get_crawler(Name=name)['Crawler'] From 350825651647c09bb0a0bfef35563f09ecacf126 Mon Sep 17 00:00:00 2001 From: Iakov Gan Date: Sat, 23 Dec 2023 23:24:23 +0100 Subject: [PATCH 07/18] sort --- cid/builtin/core/data/resources.yaml | 155 +++++++++++++-------------- 1 file changed, 77 insertions(+), 78 deletions(-) diff --git a/cid/builtin/core/data/resources.yaml b/cid/builtin/core/data/resources.yaml index 7db7fe5a..d4f67d97 100644 --- a/cid/builtin/core/data/resources.yaml +++ b/cid/builtin/core/data/resources.yaml @@ -268,47 +268,46 @@ views: File: cid/summary_view.sql dependsOn: cur: + - bill_billing_entity - bill_billing_period_start_date - - line_item_usage_start_date - - bill_payer_account_id - - line_item_usage_account_id - bill_invoice_id + - bill_payer_account_id + - line_item_availability_zone + - line_item_legal_entity + - line_item_line_item_description - line_item_line_item_type - - savings_plan_savings_plan_a_r_n - - reservation_reservation_a_r_n - - line_item_usage_type - - purchase_option - - line_item_product_code - - product_product_name - - product_servicecode - - product_product_family - line_item_operation - - line_item_line_item_description - - line_item_availability_zone - - product_region + - line_item_product_code + - Line_item_resource_id + - line_item_unblended_cost + - line_item_usage_account_id + - line_item_usage_amount + - line_item_usage_start_date + - line_item_usage_type + - pricing_public_on_demand_cost + - pricing_unit + - product_current_generation + - product_database_engine + - product_from_location + - product_group - product_operating_system - - product_tenancy - product_physical_processor - product_processor_features - - product_database_engine - - product_group - - product_from_location + - product_product_family + - product_product_name + - product_region + - product_servicecode + - product_tenancy - product_to_location - - product_current_generation - - line_item_legal_entity - - bill_billing_entity - - pricing_unit - - Line_item_resource_id - - line_item_usage_amount - - line_item_unblended_cost - - savings_plan_savings_plan_effective_cost - - savings_plan_total_commitment_to_date - - savings_plan_used_commitment - reservation_effective_cost + - reservation_reservation_a_r_n - reservation_unused_amortized_upfront_fee_for_billing_period - reservation_unused_recurring_fee - - pricing_public_on_demand_cost - savings_plan_amortized_upfront_commitment_for_billing_period + - savings_plan_savings_plan_a_r_n + - savings_plan_savings_plan_effective_cost + - savings_plan_total_commitment_to_date + - savings_plan_used_commitment ec2_running_cost: spriFile: cid/ec2_running_cost_sp_ri.sql @@ -318,50 +317,50 @@ views: dependsOn: cur: - bill_billing_period_start_date - - line_item_usage_start_date - bill_payer_account_id + - line_item_line_item_type + - line_item_unblended_cost - line_item_usage_account_id + - line_item_usage_amount + - line_item_usage_start_date - line_item_usage_type + - reservation_effective_cost - reservation_reservation_a_r_n - savings_plan_savings_plan_a_r_n - - line_item_line_item_type - savings_plan_savings_plan_effective_cost - - reservation_effective_cost - - line_item_unblended_cost - - line_item_usage_amount compute_savings_plan_eligible_spend: File: cid/compute_savings_plan_eligible_spend.sql dependsOn: cur: true + - bill_billing_period_start_date - bill_payer_account_id + - line_item_line_item_type + - line_item_operation + - line_item_product_code + - line_item_unblended_cost - line_item_usage_account_id - - bill_billing_period_start_date - line_item_usage_start_date - - line_item_operation - line_item_usage_type - product_servicecode - - line_item_line_item_type - - line_item_product_code - - line_item_unblended_cost s3_view: File: cid/s3.sql dependsOn: cur: - bill_billing_period_start_date - - line_item_usage_start_date - bill_payer_account_id - - line_item_usage_account_id - - line_item_resource_id - - line_item_product_code - - line_item_operation - - product_region - line_item_line_item_type - - pricing_unit - - line_item_usage_amount + - line_item_operation + - line_item_product_code + - line_item_resource_id - line_item_unblended_cost + - line_item_usage_account_id + - line_item_usage_amount + - line_item_usage_start_date - pricing_public_on_demand_cost + - pricing_unit + - product_region ri_sp_mapping: spriFile: cid/ri_sp_mapping_sp_ri.sql @@ -372,13 +371,13 @@ views: cur: - bill_billing_period_start_date - bill_payer_account_id - - savings_plan_savings_plan_a_r_n - - savings_plan_end_time - - reservation_reservation_a_r_n + - line_item_line_item_type - pricing_lease_contract_length - pricing_offering_class - pricing_purchase_option - - line_item_line_item_type + - reservation_reservation_a_r_n + - savings_plan_end_time + - savings_plan_savings_plan_a_r_n hourly_view: spriFile: cudos/hourly_view_sp_ri.sql @@ -387,25 +386,25 @@ views: File: cudos/hourly_view.sql dependsOn: cur: - - line_item_product_code - - product_servicecode - - line_item_operation + - bill_billing_period_start_date + - bill_payer_account_id + - line_item_line_item_description - line_item_line_item_type + - line_item_operation + - line_item_product_code + - line_item_unblended_cost + - line_item_usage_account_id + - line_item_usage_amount + - line_item_usage_start_date - line_item_usage_type - - line_item_line_item_description + - pricing_term - pricing_unit - product_region - - pricing_term - - bill_billing_period_start_date - - line_item_usage_start_date - - bill_payer_account_id - - line_item_usage_account_id - - savings_plan_savings_plan_a_r_n - - reservation_reservation_a_r_n - - line_item_unblended_cost + - product_servicecode - reservation_effective_cost + - reservation_reservation_a_r_n + - savings_plan_savings_plan_a_r_n - savings_plan_savings_plan_effective_cost - - line_item_usage_amount resource_view: spriFile: cudos/resource_view_sp_ri.sql @@ -414,21 +413,21 @@ views: File: cudos/resource_view.sql dependsOn: cur: - - line_item_usage_start_date - - bill_payer_account_id - - line_item_usage_account_id - bill_billing_entity - - product_product_name - - line_item_resource_id - - line_item_product_code - - line_item_operation + - bill_payer_account_id + - line_item_legal_entity + - line_item_line_item_description - line_item_line_item_type + - line_item_operation + - line_item_product_code + - line_item_resource_id + - line_item_unblended_cost + - line_item_usage_account_id + - line_item_usage_amount + - line_item_usage_start_date - line_item_usage_type - - pricing_unit - - product_region - - line_item_line_item_description - - line_item_legal_entity - pricing_term + - pricing_unit - product_database_engine - product_deployment_option - product_from_location @@ -437,16 +436,16 @@ views: - product_instance_type_family - product_operating_system - product_product_family + - product_product_name + - product_region - product_servicecode - product_storage - product_to_location - product_volume_api_name + - reservation_effective_cost - reservation_reservation_a_r_n - savings_plan_savings_plan_a_r_n - savings_plan_savings_plan_effective_cost - - reservation_effective_cost - - line_item_usage_amount - - line_item_unblended_cost # Trends daily_anomaly_detection: From 6d8f31439e2d9ba82dd5e97a6c8f8115cd0ebe5e Mon Sep 17 00:00:00 2001 From: Iakov Gan Date: Sat, 23 Dec 2023 23:28:13 +0100 Subject: [PATCH 08/18] fix import --- cid/helpers/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cid/helpers/__init__.py b/cid/helpers/__init__.py index 89946c66..41103d4c 100644 --- a/cid/helpers/__init__.py +++ b/cid/helpers/__init__.py @@ -1,6 +1,6 @@ from cid.helpers.athena import Athena -from cid.helpers.cur import CUR from cid.helpers.glue import Glue +from cid.helpers.cur import CUR from cid.helpers.diff import diff from cid.helpers.quicksight import QuickSight, Dashboard, Dataset, Datasource, Template from cid.helpers.csv2view import csv2view From 900a42ab21deb11a664127b7c3817207f4753243 Mon Sep 17 00:00:00 2001 From: Iakov Gan Date: Sun, 24 Dec 2023 10:50:52 +0100 Subject: [PATCH 09/18] refactoring cur --- cid/common.py | 24 ++++---- cid/helpers/account_map.py | 4 +- cid/helpers/cur.py | 123 +++++++++++++++++-------------------- 3 files changed, 70 insertions(+), 81 deletions(-) diff --git a/cid/common.py b/cid/common.py index b5a61358..9aee6e6d 100644 --- a/cid/common.py +++ b/cid/common.py @@ -131,12 +131,12 @@ def cur(self) -> CUR: if not _cur.configured: raise CidCritical("Error: please ensure CUR is enabled, if yes allow it some time to propagate") - print(f'\tAthena table: {_cur.tableName}') - print(f"\tResource IDs: {'yes' if _cur.hasResourceIDs else 'no'}") - if not _cur.hasResourceIDs: + print(f'\tAthena table: {_cur.table_name}') + print(f"\tResource IDs: {'yes' if _cur.has_resource_ids else 'no'}") + if not _cur.has_resource_ids: raise CidCritical("Error: CUR has to be created with Resource IDs") - print(f"\tSavingsPlans: {'yes' if _cur.hasSavingsPlans else 'no'}") - print(f"\tReserved Instances: {'yes' if _cur.hasReservations else 'no'}") + print(f"\tSavingsPlans: {'yes' if _cur.has_savings_plans else 'no'}") + print(f"\tReserved Instances: {'yes' if _cur.has_reservations else 'no'}") print('\n') self._clients.update({ 'cur': _cur @@ -1361,7 +1361,7 @@ def create_or_update_dataset(self, dataset_definition: dict, dataset_id: str=Non # Check for required views _views = dataset_definition.get('dependsOn', {}).get('views', []) - required_views = [(self.cur.tableName if cur_required and name =='${cur_table_name}' else name) for name in _views] + required_views = [(self.cur.table_name if cur_required and name =='${cur_table_name}' else name) for name in _views] self.athena.discover_views(required_views) found_views = utils.intersection(required_views, self.athena._metadata.keys()) @@ -1370,7 +1370,7 @@ def create_or_update_dataset(self, dataset_definition: dict, dataset_id: str=Non if recursive: print(f"Detected views: {', '.join(found_views)}") for view_name in found_views: - if cur_required and view_name == self.cur.tableName: + if cur_required and view_name == self.cur.table_name: logger.debug(f'Dependancy view {view_name} is a CUR. Skip.') continue if view_name == 'account_map': @@ -1389,7 +1389,7 @@ def create_or_update_dataset(self, dataset_definition: dict, dataset_id: str=Non columns_tpl = { 'athena_datasource_arn': athena_datasource.arn, 'athena_database_name': self.athena.DatabaseName, - 'cur_table_name': self.cur.tableName if cur_required else None + 'cur_table_name': self.cur.table_name if cur_required else None } logger.debug(f'dataset_id={dataset_id}') @@ -1600,11 +1600,11 @@ def get_view_query(self, view_name: str) -> str: # View path view_definition = self.get_definition("view", name=view_name) cur_required = view_definition.get('dependsOn', dict()).get('cur') - if cur_required and self.cur.hasSavingsPlans and self.cur.hasReservations and view_definition.get('spriFile'): + if cur_required and self.cur.has_savings_plans and self.cur.has_reservations and view_definition.get('spriFile'): view_definition['File'] = view_definition.get('spriFile') - elif cur_required and self.cur.hasSavingsPlans and view_definition.get('spFile'): + elif cur_required and self.cur.has_savings_plans and view_definition.get('spFile'): view_definition['File'] = view_definition.get('spFile') - elif cur_required and self.cur.hasReservations and view_definition.get('riFile'): + elif cur_required and self.cur.has_reservations and view_definition.get('riFile'): view_definition['File'] = view_definition.get('riFile') elif view_definition.get('File') or view_definition.get('Data') or view_definition.get('data'): pass @@ -1621,7 +1621,7 @@ def get_view_query(self, view_name: str) -> str: # Prepare template parameters columns_tpl = { - 'cur_table_name': self.cur.tableName if cur_required else None, + 'cur_table_name': self.cur.table_name if cur_required else None, 'athenaTableName': view_name, 'athena_database_name': self.athena.DatabaseName, } diff --git a/cid/helpers/account_map.py b/cid/helpers/account_map.py index fae3a7c8..dd44176e 100644 --- a/cid/helpers/account_map.py +++ b/cid/helpers/account_map.py @@ -159,7 +159,7 @@ def create(self, name) -> bool: # Fill in TPLs columns_tpl = { 'metadata_table_name': self._athena_table_name, - 'cur_table_name': self.cur.tableName # only for trends + 'cur_table_name': self.cur.table_name # only for trends } for key, val in self.mappings.get(name).get(self._athena_table_name).items(): logger.debug(f'Mapping field {key} to {val}') @@ -185,7 +185,7 @@ def get_dummy_account_mapping_sql(self, name) -> list: ).decode('utf-8')) columns_tpl = { 'athena_view_name': name, - 'cur_table_name': self.cur.tableName + 'cur_table_name': self.cur.table_name } compiled_query = template.safe_substitute(columns_tpl) return compiled_query diff --git a/cid/helpers/cur.py b/cid/helpers/cur.py index a18ff28d..52f39a9c 100644 --- a/cid/helpers/cur.py +++ b/cid/helpers/cur.py @@ -1,3 +1,5 @@ +""" Manage AWS CUR +""" import json import logging @@ -10,6 +12,8 @@ class CUR(CidBase): + """ Manage AWS CUR + """ cur_minimal_required_columns = [ 'bill_bill_type', 'bill_billing_entity', @@ -25,7 +29,6 @@ class CUR(CidBase): 'line_item_operation', 'line_item_product_code', 'line_item_resource_id', - #'line_item_resource_id', 'line_item_unblended_cost', 'line_item_usage_account_id', 'line_item_usage_amount', @@ -34,20 +37,6 @@ class CUR(CidBase): 'line_item_usage_type', 'pricing_term', 'pricing_unit', - # 'product_database_engine', - # 'product_deployment_option', - # 'product_from_location', - # 'product_group', - # 'product_instance_type', - # 'product_instance_type_family', - # 'product_operating_system', - # 'product_product_family', - # 'product_product_name', - # 'product_region', - # 'product_servicecode', - # 'product_storage', - # 'product_to_location', - # 'product_volume_api_name', ] ri_required_columns = [ 'reservation_reservation_a_r_n', @@ -67,43 +56,40 @@ class CUR(CidBase): 'savings_plan_offering_type', 'savings_plan_payment_option' ] - _tableName = None + _table_name = None _metadata = None - _clients = dict() - _hasResourceIDs = None - _hasSavingsPlans = None - _hasReservations = None + _clients = {} + _has_resource_ids = None + _has_savings_plans = None + _has_reservations = None _configured = None - _status = str() + _status = {} - def __init__(self, session) -> None: - super().__init__(session) - @property def athena(self) -> Athena: + """ Get Athena Client """ if not self._clients.get('athena'): - self._clients.update({ - 'athena': Athena(self.session) - }) + self._clients['athena'] = Athena(self.session) return self._clients.get('athena') @athena.setter def athena(self, client) -> Athena: + """ Set Athena Client """ if not self._clients.get('athena'): - self._clients.update({ - 'athena': client - }) + self._clients['athena'] = client return self._clients.get('athena') @property def glue(self) -> Glue: + """ Get Glue Client """ if not self._clients.get('glue'): self._clients['glue'] = Glue(self.session) return self._clients.get('glue') @glue.setter def glue(self, client) -> Glue: + """ Set Glue client """ if not self._clients.get('glue'): self._clients['glue'] = client return self._clients.get('glue') @@ -112,39 +98,40 @@ def glue(self, client) -> Glue: def configured(self) -> bool: """ Check if AWS Data Catalog and Athena database exist """ if self._configured is None: - if self.athena.CatalogName and self.athena.DatabaseName: - self._configured = True - else: - self._configured = False + self._configured = bool(self.athena.CatalogName and self.athena.DatabaseName) return self._configured @property - def tableName(self) -> str: + def table_name(self) -> str: + """ Get Athena table name """ if self.metadata is None: raise CidCritical('Error: Cannot detect any CUR table. Hint: Check if AWS Lake Formation is activated on your account, verify that the LakeFormationEnabled parameter is set to yes on the deployment stack') return self.metadata.get('Name') @property - def hasResourceIDs(self) -> bool: - if self._configured and self._hasResourceIDs is None: - self._hasResourceIDs = 'line_item_resource_id' in self.fields - return self._hasResourceIDs + def has_resource_ids(self) -> bool: + """ Return True if CUR has resource ids """ + if self._configured and self._has_resource_ids is None: + self._has_resource_ids = 'line_item_resource_id' in self.fields + return self._has_resource_ids @property - def hasReservations(self) -> bool: - if self._configured and self._hasReservations is None: - logger.debug(f'{self.ri_required_columns}: {[c in self.fields for c in self.ri_required_columns]}') - self._hasReservations=all([c in self.fields for c in self.ri_required_columns]) - logger.info(f'Reserved Instances: {self._hasReservations}') - return self._hasReservations + def has_reservations(self) -> bool: + """ Return True if CUR has reservation fields """ + if self._configured and self._has_reservations is None: + logger.debug(f'{self.ri_required_columns}: {[col in self.fields for col in self.ri_required_columns]}') + self._has_reservations = all(col in self.fields for col in self.ri_required_columns) + logger.info(f'Reserved Instances: {self._has_reservations}') + return self._has_reservations @property - def hasSavingsPlans(self) -> bool: - if self._configured and self._hasSavingsPlans is None: - logger.debug(f'{self.sp_required_columns}: {[c in self.fields for c in self.sp_required_columns]}') - self._hasSavingsPlans=all([c in self.fields for c in self.sp_required_columns]) - logger.info(f'Savings Plans: {self._hasSavingsPlans}') - return self._hasSavingsPlans + def has_savings_plans(self) -> bool: + """ Return True if CUR has savings plan """ + if self._configured and self._has_savings_plans is None: + logger.debug(f'{self.sp_required_columns}: {[col in self.fields for col in self.sp_required_columns]}') + self._has_savings_plans=all(col in self.fields for col in self.sp_required_columns) + logger.info(f'Savings Plans: {self._has_savings_plans}') + return self._has_savings_plans def get_type_of_column(self, column: str): """ Return an Athena type of a given non existent CUR column """ @@ -155,7 +142,7 @@ def get_type_of_column(self, column: str): return 'DOUBLE' if column.endswith('_date') and not column.endswith('_to_date'): return 'TIMESTAMP' - SPECIAL = { + special_cases = { "reservation_amortized_upfront_cost_for_usage": "DOUBLE", "reservation_amortized_upfront_fee_for_billing_period": "DOUBLE", "reservation_recurring_fee_for_usage": "DOUBLE", @@ -173,7 +160,7 @@ def get_type_of_column(self, column: str): "savings_plan_net_amortized_upfront_commitment_for_billing_period": "DOUBLE", "savings_plan_recurring_commitment_for_billing_period": "DOUBLE", } - return SPECIAL.get(column, 'STRING') + return special_cases.get(column, 'STRING') def ensure_column(self, column: str, column_type: str=None): """ Ensure column is in the cur. If it is not there - add column """ @@ -191,22 +178,22 @@ def ensure_column(self, column: str, column_type: str=None): column_type = column_type or self.get_type_of_column(column) try: - self.athena.query(f'ALTER TABLE {self._tableName} ADD COLUMNS ({column} {column_type})') + self.athena.query(f'ALTER TABLE {self._table_name} ADD COLUMNS ({column} {column_type})') except (self.athena.client.exceptions.ClientError, CidCritical) as exc: raise CidCritical(f'Column {column} is not found in CUR and we were unable to add it. Please check FAQ.') from exc - self._metadata = self.athena.get_table_metadata(self._tableName) # refresh table metadata - logger.critical(f"Column '{column}' was added to CUR ({self._tableName}). Please make sure crawler do not override that columns. Crawler='{crawler_name}'") + self._metadata = self.athena.get_table_metadata(self._table_name) # refresh table metadata + logger.critical(f"Column '{column}' was added to CUR ({self._table_name}). Please make sure crawler do not override that columns. Crawler='{crawler_name}'") def table_is_cur(self, table: dict=None, name: str=None, return_reason: bool=False) -> bool: """ return True if table metadata fits CUR definition. """ try: table = table or self.athena.get_table_metadata(name) - except Exception as exc: + except Exception as exc: #pylint: disable=broad-exception-caught logger.debug(exc) return False if not return_reason else (False, f'cannot get table {name}. {exc}.') table_name = table.get('Name') - columns = [cols.get('Name') for cols in table.get('Columns')] + columns = [col.get('Name') for col in table.get('Columns')] missing_columns = [col for col in self.cur_minimal_required_columns if col not in columns] if missing_columns: return False if not return_reason else (False, f"Table {table_name} does not contain columns: {','.join(missing_columns)}. You can try ALTER TABLE {table_name} ADD COLUMNS (missing_column string).") @@ -215,18 +202,19 @@ def table_is_cur(self, table: dict=None, name: str=None, return_reason: bool=Fal @property def metadata(self) -> dict: + """get Athena metadata for the table of CUR """ if self._metadata: return self._metadata if get_parameters().get('cur-table-name'): - self._tableName = get_parameters().get('cur-table-name') + self._table_name = get_parameters().get('cur-table-name') try: - self._metadata = self.athena.get_table_metadata(self._tableName) + self._metadata = self.athena.get_table_metadata(self._table_name) except self.athena.client.exceptions.ResourceNotFoundException as exc: - raise CidCritical('Provided cur-table-name "{self._tableName}" is not found. Please make sure the table exists.') from exc + raise CidCritical('Provided cur-table-name "{self._table_name}" is not found. Please make sure the table exists.') from exc res, message = self.table_is_cur(table=self._metadata, return_reason=True) if not res: - raise CidCritical(f'Table {self._tableName} does not look like CUR. {message}') + raise CidCritical(f'Table {self._table_name} does not look like CUR. {message}') else: # Look all tables and filter ones with CUR fields all_tables = self.athena.list_table_metadata() @@ -241,20 +229,21 @@ def metadata(self) -> dict: raise CidCritical(f'CUR table not found. (scanned {len(all_tables)} tables in Athena Database {self.athena.DatabaseName} in {self.athena.region}). But none has required fields: {self.cur_minimal_required_columns}.') if len(cur_tables) == 1: self._metadata = cur_tables[0] - self._tableName = self._metadata.get('Name') - logger.info('1 CUR table found: %s', self._tableName) + self._table_name = self._metadata.get('Name') + logger.info('1 CUR table found: %s', self._table_name) elif len(cur_tables) > 1: - self._tableName = get_parameter( + self._table_name = get_parameter( param_name='cur-table-name', message="Multiple CUR tables found, please select one", choices=sorted([v.get('Name') for v in cur_tables], reverse=True), ) - self._metadata = self.athena.get_table_metadata(self._tableName) + self._metadata = self.athena.get_table_metadata(self._table_name) return self._metadata @property def fields(self) -> list: - return [v.get('Name') for v in self.metadata.get('Columns', list())] + """get CUR fields """ + return [col.get('Name') for col in self.metadata.get('Columns', [])] @property def tag_and_cost_category_fields(self) -> list: From 9839247ec006f5da8398d8b151418bf9887e1827 Mon Sep 17 00:00:00 2001 From: Iakov Gan Date: Sun, 24 Dec 2023 12:42:24 +0100 Subject: [PATCH 10/18] remove resource id dep --- cid/helpers/cur.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cid/helpers/cur.py b/cid/helpers/cur.py index 52f39a9c..53c33c68 100644 --- a/cid/helpers/cur.py +++ b/cid/helpers/cur.py @@ -28,7 +28,6 @@ class CUR(CidBase): 'line_item_line_item_type', 'line_item_operation', 'line_item_product_code', - 'line_item_resource_id', 'line_item_unblended_cost', 'line_item_usage_account_id', 'line_item_usage_amount', From 17db51a44a7f4c7d000839545a10ce5d6c3a051e Mon Sep 17 00:00:00 2001 From: Iakov Gan Date: Sun, 24 Dec 2023 12:44:45 +0100 Subject: [PATCH 11/18] remove resource_id dep --- cid/builtin/core/data/resources.yaml | 2 +- cid/helpers/cur.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cid/builtin/core/data/resources.yaml b/cid/builtin/core/data/resources.yaml index d4f67d97..cb820019 100644 --- a/cid/builtin/core/data/resources.yaml +++ b/cid/builtin/core/data/resources.yaml @@ -278,7 +278,7 @@ views: - line_item_line_item_type - line_item_operation - line_item_product_code - - Line_item_resource_id + - line_item_resource_id - line_item_unblended_cost - line_item_usage_account_id - line_item_usage_amount diff --git a/cid/helpers/cur.py b/cid/helpers/cur.py index 53c33c68..0b5a405d 100644 --- a/cid/helpers/cur.py +++ b/cid/helpers/cur.py @@ -102,7 +102,7 @@ def configured(self) -> bool: @property def table_name(self) -> str: - """ Get Athena table name """ + """ Get Athena table name """ if self.metadata is None: raise CidCritical('Error: Cannot detect any CUR table. Hint: Check if AWS Lake Formation is activated on your account, verify that the LakeFormationEnabled parameter is set to yes on the deployment stack') return self.metadata.get('Name') From 7c53141abc4401a0d7c3ea16a73c4222c24a6855 Mon Sep 17 00:00:00 2001 From: Iakov Gan Date: Sun, 24 Dec 2023 13:03:39 +0100 Subject: [PATCH 12/18] more refactoring --- cid/helpers/cur.py | 67 +++++++++++++++------------------------------- 1 file changed, 22 insertions(+), 45 deletions(-) diff --git a/cid/helpers/cur.py b/cid/helpers/cur.py index 0b5a405d..3dea9732 100644 --- a/cid/helpers/cur.py +++ b/cid/helpers/cur.py @@ -2,6 +2,7 @@ """ import json import logging +from functools import cache, cached_property from cid.base import CidBase from cid.helpers import Athena, Glue @@ -58,48 +59,30 @@ class CUR(CidBase): _table_name = None _metadata = None _clients = {} - _has_resource_ids = None - _has_savings_plans = None - _has_reservations = None - _configured = None - _status = {} - @property + @cached_property def athena(self) -> Athena: """ Get Athena Client """ - if not self._clients.get('athena'): - self._clients['athena'] = Athena(self.session) - return self._clients.get('athena') + return self._clients.get('athena') or Athena(self.session) @athena.setter def athena(self, client) -> Athena: """ Set Athena Client """ - if not self._clients.get('athena'): - self._clients['athena'] = client + self._clients['athena'] = client return self._clients.get('athena') - @property + @cached_property def glue(self) -> Glue: """ Get Glue Client """ - if not self._clients.get('glue'): - self._clients['glue'] = Glue(self.session) - return self._clients.get('glue') + return self._clients.get('glue') or Glue(self.session) @glue.setter def glue(self, client) -> Glue: """ Set Glue client """ - if not self._clients.get('glue'): - self._clients['glue'] = client + self._clients['glue'] = client return self._clients.get('glue') - @property - def configured(self) -> bool: - """ Check if AWS Data Catalog and Athena database exist """ - if self._configured is None: - self._configured = bool(self.athena.CatalogName and self.athena.DatabaseName) - return self._configured - @property def table_name(self) -> str: """ Get Athena table name """ @@ -107,30 +90,20 @@ def table_name(self) -> str: raise CidCritical('Error: Cannot detect any CUR table. Hint: Check if AWS Lake Formation is activated on your account, verify that the LakeFormationEnabled parameter is set to yes on the deployment stack') return self.metadata.get('Name') - @property + @cached_property def has_resource_ids(self) -> bool: """ Return True if CUR has resource ids """ - if self._configured and self._has_resource_ids is None: - self._has_resource_ids = 'line_item_resource_id' in self.fields - return self._has_resource_ids + return 'line_item_resource_id' in self.fields - @property + @cached_property def has_reservations(self) -> bool: """ Return True if CUR has reservation fields """ - if self._configured and self._has_reservations is None: - logger.debug(f'{self.ri_required_columns}: {[col in self.fields for col in self.ri_required_columns]}') - self._has_reservations = all(col in self.fields for col in self.ri_required_columns) - logger.info(f'Reserved Instances: {self._has_reservations}') - return self._has_reservations + return all(col in self.fields for col in self.ri_required_columns) - @property + @cached_property def has_savings_plans(self) -> bool: """ Return True if CUR has savings plan """ - if self._configured and self._has_savings_plans is None: - logger.debug(f'{self.sp_required_columns}: {[col in self.fields for col in self.sp_required_columns]}') - self._has_savings_plans=all(col in self.fields for col in self.sp_required_columns) - logger.info(f'Savings Plans: {self._has_savings_plans}') - return self._has_savings_plans + return all(col in self.fields for col in self.sp_required_columns) def get_type_of_column(self, column: str): """ Return an Athena type of a given non existent CUR column """ @@ -170,18 +143,22 @@ def ensure_column(self, column: str, column_type: str=None): crawler_name = self.metadata.get('Parameters', {}).get('UPDATED_BY_CRAWLER') if crawler_name: # Check Crawler Behavior - if it does not have a Configuration/CrawlerOutput/TablesAddOrUpdateBehavior == MergeNewColumns, it will override columns - config = json.loads(self.glue.get_crawler(crawler_name).get('Configuration', '{}')) + try: + crawler = self.glue.get_crawler(crawler_name) + except self.glue.client.exceptions.ClientError as exc: + raise CidCritical(f'Column {column} is not found in CUR ({self.table_name}). And we were unable to check if crawler {crawler_name} is configured to override columns.') from exc + config = json.loads(crawler.get('Configuration', '{}')) add_or_update = config.get('CrawlerOutput', {}).get('Tables', {}).get('AddOrUpdateBehavior') if add_or_update != 'MergeNewColumns': - raise CidCritical(f'Column {column} is not found in CUR. And we were unable to add it as crawler {crawler_name} is configured to override columns.') + raise CidCritical(f'Column {column} is not found in CUR ({self.table_name}). And we were unable to add it as crawler {crawler_name} is configured to override columns.') column_type = column_type or self.get_type_of_column(column) try: - self.athena.query(f'ALTER TABLE {self._table_name} ADD COLUMNS ({column} {column_type})') + self.athena.query(f'ALTER TABLE {self.table_name} ADD COLUMNS ({column} {column_type})') except (self.athena.client.exceptions.ClientError, CidCritical) as exc: raise CidCritical(f'Column {column} is not found in CUR and we were unable to add it. Please check FAQ.') from exc - self._metadata = self.athena.get_table_metadata(self._table_name) # refresh table metadata - logger.critical(f"Column '{column}' was added to CUR ({self._table_name}). Please make sure crawler do not override that columns. Crawler='{crawler_name}'") + self._metadata = self.athena.get_table_metadata(self.table_name) # refresh table metadata + logger.critical(f"Column '{column}' was added to CUR ({self.table_name}). Please make sure crawler do not override that columns. Crawler='{crawler_name}'") def table_is_cur(self, table: dict=None, name: str=None, return_reason: bool=False) -> bool: """ return True if table metadata fits CUR definition. """ From c4af0fa02328bb1ebd3b621f82ea75ba4b6382e4 Mon Sep 17 00:00:00 2001 From: Iakov Gan Date: Sun, 24 Dec 2023 19:41:13 +0100 Subject: [PATCH 13/18] more refactoring --- cid/helpers/cur.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cid/helpers/cur.py b/cid/helpers/cur.py index 3dea9732..4874e816 100644 --- a/cid/helpers/cur.py +++ b/cid/helpers/cur.py @@ -2,7 +2,7 @@ """ import json import logging -from functools import cache, cached_property +from functools import cached_property from cid.base import CidBase from cid.helpers import Athena, Glue @@ -90,17 +90,17 @@ def table_name(self) -> str: raise CidCritical('Error: Cannot detect any CUR table. Hint: Check if AWS Lake Formation is activated on your account, verify that the LakeFormationEnabled parameter is set to yes on the deployment stack') return self.metadata.get('Name') - @cached_property + @property def has_resource_ids(self) -> bool: """ Return True if CUR has resource ids """ return 'line_item_resource_id' in self.fields - @cached_property + @property def has_reservations(self) -> bool: """ Return True if CUR has reservation fields """ return all(col in self.fields for col in self.ri_required_columns) - @cached_property + @property def has_savings_plans(self) -> bool: """ Return True if CUR has savings plan """ return all(col in self.fields for col in self.sp_required_columns) From 77852771a73bcd5d4421622cafeca1a988586776 Mon Sep 17 00:00:00 2001 From: Iakov Gan Date: Sun, 24 Dec 2023 20:05:41 +0100 Subject: [PATCH 14/18] fixes --- cid/helpers/cur.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/cid/helpers/cur.py b/cid/helpers/cur.py index 4874e816..1dbc8fec 100644 --- a/cid/helpers/cur.py +++ b/cid/helpers/cur.py @@ -2,7 +2,6 @@ """ import json import logging -from functools import cached_property from cid.base import CidBase from cid.helpers import Athena, Glue @@ -61,27 +60,31 @@ class CUR(CidBase): _clients = {} - @cached_property + @property def athena(self) -> Athena: """ Get Athena Client """ - return self._clients.get('athena') or Athena(self.session) + if 'athena' not in self._clients: + self._clients['athena'] = Athena(self.session) + return self._clients['athena'] @athena.setter def athena(self, client) -> Athena: """ Set Athena Client """ self._clients['athena'] = client - return self._clients.get('athena') + return self._clients['athena'] - @cached_property + @property def glue(self) -> Glue: """ Get Glue Client """ - return self._clients.get('glue') or Glue(self.session) + if 'glue' not in self._clients: + self._clients['glue'] = Glue(self.session) + return self._clients['glue'] @glue.setter def glue(self, client) -> Glue: """ Set Glue client """ self._clients['glue'] = client - return self._clients.get('glue') + return self._clients['glue'] @property def table_name(self) -> str: From d129f30be17215fc6089ef42d0c028f36761df52 Mon Sep 17 00:00:00 2001 From: Iakov Gan Date: Sun, 24 Dec 2023 21:42:45 +0100 Subject: [PATCH 15/18] fixes --- cid/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cid/common.py b/cid/common.py index 9aee6e6d..de9af5e6 100644 --- a/cid/common.py +++ b/cid/common.py @@ -128,7 +128,7 @@ def cur(self) -> CUR: _cur.glue = self.glue print('Checking if CUR is enabled and available...') - if not _cur.configured: + if not _cur.metadata: raise CidCritical("Error: please ensure CUR is enabled, if yes allow it some time to propagate") print(f'\tAthena table: {_cur.table_name}') From f7bc593a3169fc120abd28d6a968047e298129ad Mon Sep 17 00:00:00 2001 From: Iakov GAN <82834333+iakov-aws@users.noreply.github.com> Date: Wed, 3 Jan 2024 13:51:56 +0100 Subject: [PATCH 16/18] Update cid/helpers/cur.py Co-authored-by: Yuriy Prykhodko --- cid/helpers/cur.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cid/helpers/cur.py b/cid/helpers/cur.py index 1dbc8fec..9346567d 100644 --- a/cid/helpers/cur.py +++ b/cid/helpers/cur.py @@ -153,7 +153,7 @@ def ensure_column(self, column: str, column_type: str=None): config = json.loads(crawler.get('Configuration', '{}')) add_or_update = config.get('CrawlerOutput', {}).get('Tables', {}).get('AddOrUpdateBehavior') if add_or_update != 'MergeNewColumns': - raise CidCritical(f'Column {column} is not found in CUR ({self.table_name}). And we were unable to add it as crawler {crawler_name} is configured to override columns.') + raise CidCritical(f'Column {column} is not found in CUR ({self.table_name}). And we were unable to add it as crawler {crawler_name} is configured to override columns. Change crawler settings and run again.') column_type = column_type or self.get_type_of_column(column) try: From d7910e1036d8d84e79dd611e2a995b7bb0635c25 Mon Sep 17 00:00:00 2001 From: Iakov GAN <82834333+iakov-aws@users.noreply.github.com> Date: Wed, 3 Jan 2024 13:55:43 +0100 Subject: [PATCH 17/18] Update cid/helpers/cur.py Co-authored-by: Yuriy Prykhodko --- cid/helpers/cur.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cid/helpers/cur.py b/cid/helpers/cur.py index 9346567d..68e0dc7b 100644 --- a/cid/helpers/cur.py +++ b/cid/helpers/cur.py @@ -159,7 +159,7 @@ def ensure_column(self, column: str, column_type: str=None): try: self.athena.query(f'ALTER TABLE {self.table_name} ADD COLUMNS ({column} {column_type})') except (self.athena.client.exceptions.ClientError, CidCritical) as exc: - raise CidCritical(f'Column {column} is not found in CUR and we were unable to add it. Please check FAQ.') from exc + raise CidCritical(f'Column {column} is not found in CUR and we were unable to add it.') from exc self._metadata = self.athena.get_table_metadata(self.table_name) # refresh table metadata logger.critical(f"Column '{column}' was added to CUR ({self.table_name}). Please make sure crawler do not override that columns. Crawler='{crawler_name}'") From 8a0dba9f8ca1bcc1a7be6abe3bd074e3b3798be8 Mon Sep 17 00:00:00 2001 From: Iakov GAN <82834333+iakov-aws@users.noreply.github.com> Date: Wed, 3 Jan 2024 15:49:52 +0100 Subject: [PATCH 18/18] Update cid/helpers/cur.py Co-authored-by: Yuriy Prykhodko --- cid/helpers/cur.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cid/helpers/cur.py b/cid/helpers/cur.py index 68e0dc7b..a1933803 100644 --- a/cid/helpers/cur.py +++ b/cid/helpers/cur.py @@ -161,7 +161,7 @@ def ensure_column(self, column: str, column_type: str=None): except (self.athena.client.exceptions.ClientError, CidCritical) as exc: raise CidCritical(f'Column {column} is not found in CUR and we were unable to add it.') from exc self._metadata = self.athena.get_table_metadata(self.table_name) # refresh table metadata - logger.critical(f"Column '{column}' was added to CUR ({self.table_name}). Please make sure crawler do not override that columns. Crawler='{crawler_name}'") + logger.critical(f"Column '{column}' was added to CUR ({self.table_name}).") def table_is_cur(self, table: dict=None, name: str=None, return_reason: bool=False) -> bool: """ return True if table metadata fits CUR definition. """