From 34fea4fcf5690447f831583c63f13682d175a8fe Mon Sep 17 00:00:00 2001 From: Tejas Rajopadhye <71188245+TejasRGitHub@users.noreply.github.com> Date: Fri, 23 Feb 2024 11:17:26 -0600 Subject: [PATCH] [Gh 904] Central Catalog Support (#1021) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Feature or Bugfix - Feature ### Detail PR containing all the code raised in PR - https://github.com/data-dot-all/dataall/pull/905 + Unit Tests + Addressing comments raised on that PR. Copy pasting details from PR - Detect if the source database is a resource link If it is a resource link, check that the catalog account has been onboarded to data.all Check for the presence of owner_account_id tag on the database The tag needs to exist and the value has to match the account id of the share approver Credits - @blitzmohit ## Testing Running Unit tests - ✅ Testing on AWS Deployed data.all instance with the Original PR - ✅ Sanity testing after addressing comments - **[EDIT]** ✅ ( Testing done ) ### Relates - https://github.com/data-dot-all/dataall/issues/904 ### Security Please answer the questions below briefly where applicable, or write `N/A`. Based on [OWASP 10](https://owasp.org/Top10/en/). - Does this PR introduce or modify any input fields or queries - this includes fetching data from storage outside the application (e.g. a database, an S3 bucket)? No - Is the input sanitized? - What precautions are you taking before deserializing the data you consume? - Is injection prevented by parametrizing queries? - Have you ensured no `eval` or similar functions are used? - Does this PR introduce any functionality or component that requires authorization? No - How have you ensured it respects the existing AuthN/AuthZ mechanisms? - Are you logging failed auth attempts? - Are you using or adding any cryptographic features? No - Do you use a standard proven implementations? - Are the used keys controlled by the customer? Where are they stored? - Are you introducing any new policies/roles/users? Yes - Have you used the least-privilege principle? How? Yes By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. --------- Co-authored-by: trajopadhye --- backend/dataall/base/aws/sts.py | 12 + .../dataset_sharing/aws/glue_client.py | 45 +++- .../modules/dataset_sharing/aws/ram_client.py | 3 +- .../share_managers/lf_share_manager.py | 248 ++++++++++++++---- .../lakeformation_process_share.py | 68 +++-- .../datasets/tasks/test_lf_share_manager.py | 231 ++++++++++++---- 6 files changed, 469 insertions(+), 138 deletions(-) diff --git a/backend/dataall/base/aws/sts.py b/backend/dataall/base/aws/sts.py index f932459ca..3bd7354bf 100644 --- a/backend/dataall/base/aws/sts.py +++ b/backend/dataall/base/aws/sts.py @@ -359,3 +359,15 @@ def generate_console_url(credentials, session_duration=None, region='eu-west-1', # Send final URL to stdout return request_url + + @staticmethod + def is_assumable_pivot_role(accountid): + try: + SessionHelper.remote_session(accountid=accountid) + except ClientError as e: + log.error(f'Failed to assume dataall pivot role session in environment with account id {accountid} due to {e}') + return False + except Exception as e: + log.error(f'Unexpected error while assuming data.all pivot role in environment with account id {accountid} due to {e}') + return False + return True diff --git a/backend/dataall/modules/dataset_sharing/aws/glue_client.py b/backend/dataall/modules/dataset_sharing/aws/glue_client.py index 11d6c0c08..48226ba28 100644 --- a/backend/dataall/modules/dataset_sharing/aws/glue_client.py +++ b/backend/dataall/modules/dataset_sharing/aws/glue_client.py @@ -13,6 +13,7 @@ def __init__(self, account_id, region, database): self._client = aws_session.client('glue', region_name=region) self._database = database self._account_id = account_id + self._region = region def create_database(self, location): try: @@ -125,14 +126,14 @@ def delete_table(self, table_name): ) raise e - def create_resource_link(self, resource_link_name, table, catalog_id): + def create_resource_link(self, resource_link_name, table, catalog_id, database): account_id = self._account_id shared_database = self._database resource_link_input = { 'Name': table.GlueTableName, 'TargetTable': { 'CatalogId': catalog_id, - 'DatabaseName': table.GlueDatabaseName, + 'DatabaseName': database, 'Name': table.GlueTableName, }, } @@ -192,3 +193,43 @@ def delete_database(self): f'due to: {e}' ) raise e + + def get_source_catalog(self): + """ Get the source catalog account details """ + try: + log.info(f'Fetching source catalog details for database {self._database}...') + response = self._client.get_database(CatalogId=self._account_id, Name=self._database) + linked_database = response.get('Database', {}).get('TargetDatabase', {}) + log.info(f'Fetched source catalog details for database {self._database} are: {linked_database}...') + if linked_database: + return {'account_id' : linked_database.get('CatalogId'), 'database_name' : linked_database.get('DatabaseName'), 'region' : linked_database.get('Region', self._region)} + + except self._client.exceptions.EntityNotFoundException as enoFnd: + log.exception(f'Could not fetch source catalog details for database {self._database} due to {enoFnd}') + raise enoFnd + except Exception as e: + log.exception(f'Error fetching source catalog details for database {self._database} due to {e}') + raise e + return None + + def get_database_tags(self): + # Get tags from the glue database + account_id = self._account_id + database = self._database + region = self._region + + try: + log.info(f'Getting tags for database {database}') + resource_arn = f'arn:aws:glue:{region}:{account_id}:database/{database}' + response = self._client.get_tags(ResourceArn=resource_arn) + tags = response['Tags'] + + log.info(f'Successfully retrieved tags: {tags}') + + return tags + except self._client.exceptions.EntityNotFoundException as entNotFound: + log.exception(f'Could not get tags for database {database} due to {entNotFound}') + raise entNotFound + except Exception as e: + log.exception(f'Error fetching tags for {database} due to {e}') + raise e diff --git a/backend/dataall/modules/dataset_sharing/aws/ram_client.py b/backend/dataall/modules/dataset_sharing/aws/ram_client.py index c471be675..833dda4b2 100644 --- a/backend/dataall/modules/dataset_sharing/aws/ram_client.py +++ b/backend/dataall/modules/dataset_sharing/aws/ram_client.py @@ -64,7 +64,8 @@ def _accept_resource_share_invitation(self, resource_share_invitation_arn): raise e @staticmethod - def accept_ram_invitation(source_account_id, source_region, target_account_id, target_region, source_database, source_table): + def accept_ram_invitation(source_account_id, source_region, source_database, source_table, target_account_id, + target_region): """ Accepts RAM invitations on the target account """ diff --git a/backend/dataall/modules/dataset_sharing/services/share_managers/lf_share_manager.py b/backend/dataall/modules/dataset_sharing/services/share_managers/lf_share_manager.py index b3f2eab95..e3ccff850 100644 --- a/backend/dataall/modules/dataset_sharing/services/share_managers/lf_share_manager.py +++ b/backend/dataall/modules/dataset_sharing/services/share_managers/lf_share_manager.py @@ -11,6 +11,9 @@ from dataall.base.aws.iam import IAM from dataall.base.aws.sts import SessionHelper from dataall.base.db import exceptions +from dataall.modules.dataset_sharing.db.share_object_repositories import ShareObjectRepository, ShareItemSM +from dataall.modules.dataset_sharing.services.dataset_sharing_enums import ShareItemStatus, ShareObjectActions, \ + ShareItemActions from dataall.modules.datasets_base.db.dataset_models import DatasetTable, Dataset from dataall.modules.dataset_sharing.services.dataset_alarm_service import DatasetAlarmService from dataall.modules.dataset_sharing.db.share_object_models import ShareObjectItem, ShareObject @@ -20,15 +23,15 @@ class LFShareManager: def __init__( - self, - session, - dataset: Dataset, - share: ShareObject, - shared_tables: [DatasetTable], - revoked_tables: [DatasetTable], - source_environment: Environment, - target_environment: Environment, - env_group: EnvironmentGroup, + self, + session, + dataset: Dataset, + share: ShareObject, + shared_tables: [DatasetTable], + revoked_tables: [DatasetTable], + source_environment: Environment, + target_environment: Environment, + env_group: EnvironmentGroup, ): self.session = session self.env_group = env_group @@ -38,22 +41,16 @@ def __init__( self.revoked_tables = revoked_tables self.source_environment = source_environment self.target_environment = target_environment + # Set the source account details by checking if a catalog account exists + self.source_account_id, self.source_account_region, self.source_database_name = self.init_source_account_details() self.shared_db_name, self.is_new_share = self.build_shared_db_name() self.principals = self.get_share_principals() - self.cross_account = self.target_environment.AwsAccountId != self.source_environment.AwsAccountId - self.lf_client_in_target = LakeFormationClient( - account_id=self.target_environment.AwsAccountId, - region=self.target_environment.region - ) - self.lf_client_in_source = LakeFormationClient( - account_id=self.source_environment.AwsAccountId, - region=self.source_environment.region - ) - self.glue_client_in_target = GlueClient( - account_id=self.target_environment.AwsAccountId, - region=self.target_environment.region, - database=self.shared_db_name, - ) + self.cross_account = self.target_environment.AwsAccountId != self.source_account_id + # Below Clients initialized by the initialize_clients() + self.glue_client_in_source = None + self.glue_client_in_target = None + self.lf_client_in_source = None + self.lf_client_in_target = None @abc.abstractmethod def process_approved_shares(self) -> [str]: @@ -63,6 +60,19 @@ def process_approved_shares(self) -> [str]: def process_revoked_shares(self) -> [str]: return NotImplementedError + def init_source_account_details(self): + """ + Check if the catalog account is present and update the source account, source database and source region accordingly + """ + catalog_account_present = self.check_catalog_account_exists_and_verify() + if catalog_account_present is not False: + if catalog_account_present is not None: + return self.get_catalog_account_details() + else: + return None, None, None + else: + return self.source_environment.AwsAccountId, self.source_environment.region, self.dataset.GlueDatabaseName + def get_share_principals(self) -> [str]: """ Builds list of principals of the share request @@ -73,7 +83,8 @@ def get_share_principals(self) -> [str]: role_name=self.share.principalIAMRoleName ) principals = [principal_iam_role_arn] - dashboard_enabled = EnvironmentService.get_boolean_env_param(self.session, self.target_environment, "dashboardsEnabled") + dashboard_enabled = EnvironmentService.get_boolean_env_param(self.session, self.target_environment, + "dashboardsEnabled") if dashboard_enabled: group = QuicksightClient.create_quicksight_group( @@ -92,11 +103,14 @@ def build_shared_db_name(self) -> tuple: For shares after 2.3.0 the suffix returned is "_shared" :return: Shared database name, boolean indicating if it is a new share """ - old_shared_db_name = (self.dataset.GlueDatabaseName + '_shared_' + self.share.shareUri)[:254] + if self.source_database_name is None: + return '', True + old_shared_db_name = (self.source_database_name + '_shared_' + self.share.shareUri)[:254] warn('old_shared_db_name will be deprecated in v2.6.0', DeprecationWarning, stacklevel=2) logger.info( f'Checking shared db {old_shared_db_name} exists in {self.target_environment.AwsAccountId}...' ) + database = GlueClient( account_id=self.target_environment.AwsAccountId, database=old_shared_db_name, @@ -105,10 +119,10 @@ def build_shared_db_name(self) -> tuple: if database: return old_shared_db_name, False - return self.dataset.GlueDatabaseName + '_shared', True + return self.source_database_name + '_shared', True def check_table_exists_in_source_database( - self, share_item: ShareObjectItem, table: DatasetTable + self, share_item: ShareObjectItem, table: DatasetTable ) -> True: """ Checks if the table to be shared exists on the Glue catalog in the source account @@ -116,12 +130,7 @@ def check_table_exists_in_source_database( :param table: DatasetTable :return: True or raise exceptions.AWSResourceNotFound """ - glue_client = GlueClient( - account_id=self.source_environment.AwsAccountId, - region=self.source_environment.region, - database=table.GlueDatabaseName, - ) - if not glue_client.table_exists(table.GlueTableName): + if not self.glue_client_in_source.table_exists(table.GlueTableName): raise exceptions.AWSResourceNotFound( action='ProcessShare', message=( @@ -132,19 +141,14 @@ def check_table_exists_in_source_database( return True def check_resource_link_table_exists_in_target_database( - self, table: DatasetTable + self, table: DatasetTable ) -> bool: """ Checks if the table to be shared exists on the Glue catalog in the target account as resource link :param table: DatasetTable :return: Boolean """ - glue_client = GlueClient( - account_id=self.target_environment.AwsAccountId, - region=self.target_environment.region, - database=self.shared_db_name, - ) - if glue_client.table_exists(table.GlueTableName): + if self.glue_client_in_target.table_exists(table.GlueTableName): return True logger.info( f'Resource link could not be found ' @@ -161,9 +165,9 @@ def revoke_iam_allowed_principals_from_table(self, table: DatasetTable) -> True: """ self.lf_client_in_source.revoke_permissions_from_table( principals=['EVERYONE'], - database_name=table.GlueDatabaseName, + database_name=self.source_database_name, table_name=table.GlueTableName, - catalog_id=self.source_environment.AwsAccountId, + catalog_id=self.source_account_id, permissions=['ALL'] ) return True @@ -174,8 +178,8 @@ def grant_pivot_role_all_database_permissions_to_source_database(self) -> True: :return: True if it is successful """ self.lf_client_in_source.grant_permissions_to_database( - principals=[SessionHelper.get_delegation_role_arn(self.source_environment.AwsAccountId)], - database_name=self.dataset.GlueDatabaseName, + principals=[SessionHelper.get_delegation_role_arn(self.source_account_id)], + database_name=self.source_database_name, permissions=['ALL'], ) return True @@ -237,9 +241,9 @@ def grant_target_account_permissions_to_source_table(self, table: DatasetTable) """ self.lf_client_in_source.grant_permissions_to_table( principals=[self.target_environment.AwsAccountId], - database_name=table.GlueDatabaseName, + database_name=self.source_database_name, table_name=table.GlueTableName, - catalog_id=self.source_environment.AwsAccountId, + catalog_id=self.source_account_id, permissions=['DESCRIBE', 'SELECT'], permissions_with_grant_options=['DESCRIBE', 'SELECT'] ) @@ -253,12 +257,12 @@ def check_if_exists_and_create_resource_link_table_in_shared_database(self, tabl :param table: DatasetTable :return: True if it is successful """ - if not self.check_resource_link_table_exists_in_target_database(table): self.glue_client_in_target.create_resource_link( resource_link_name=table.GlueTableName, table=table, - catalog_id=self.source_environment.AwsAccountId + catalog_id=self.source_account_id, + database=self.source_database_name ) return True @@ -285,9 +289,9 @@ def grant_principals_permissions_to_table_in_target(self, table: DatasetTable) - """ self.lf_client_in_target.grant_permissions_to_table_with_columns( principals=self.principals, - database_name=table.GlueDatabaseName, + database_name=self.source_database_name, table_name=table.GlueTableName, - catalog_id=self.source_environment.AwsAccountId, + catalog_id=self.source_account_id, permissions=['DESCRIBE', 'SELECT'] ) return True @@ -319,13 +323,14 @@ def revoke_principals_permissions_to_table_in_target(self, table: DatasetTable, :param other_table_shares_in_env: Boolean. Other table shares in this environment for this table :return: True if it is successful """ - principals = self.principals if not other_table_shares_in_env else [p for p in self.principals if "arn:aws:quicksight" not in p] + principals = self.principals if not other_table_shares_in_env else [p for p in self.principals if + "arn:aws:quicksight" not in p] self.lf_client_in_target.revoke_permissions_from_table_with_columns( principals=principals, - database_name=table.GlueDatabaseName, + database_name=self.source_database_name, table_name=table.GlueTableName, - catalog_id=self.source_environment.AwsAccountId, + catalog_id=self.source_account_id, permissions=['DESCRIBE', 'SELECT'] ) return True @@ -380,18 +385,18 @@ def revoke_external_account_access_on_source_account(self, table: DatasetTable) """ self.lf_client_in_source.revoke_permissions_from_table_with_columns( principals=[self.target_environment.AwsAccountId], - database_name=table.GlueDatabaseName, + database_name=self.source_database_name, table_name=table.GlueTableName, - catalog_id=self.source_environment.AwsAccountId, + catalog_id=self.source_account_id, permissions=['DESCRIBE', 'SELECT'], permissions_with_grant_options=['DESCRIBE', 'SELECT'] ) return True def handle_share_failure( - self, - table: DatasetTable, - error: Exception, + self, + table: DatasetTable, + error: Exception, ) -> True: """ Handles share failure by raising an alarm to alarmsTopic @@ -432,3 +437,130 @@ def handle_revoke_failure( table, self.share, self.target_environment ) return True + + def handle_share_failure_for_all_tables(self, tables, error, share_item_status): + """ + Handle table share failure for all tables + :param tables - List[DatasetTable] + :param error - share error + :param share_item_status : Status of approved/ revoked share + returns : Returns True is handling is successful + """ + for table in tables: + share_item = ShareObjectRepository.find_sharable_item( + self.session, self.share.shareUri, table.tableUri + ) + share_item_sm = ShareItemSM(share_item_status) + new_state = share_item_sm.run_transition(ShareObjectActions.Start.value) + share_item_sm.update_state_single_item(self.session, share_item, new_state) + new_state = share_item_sm.run_transition(ShareItemActions.Failure.value) + share_item_sm.update_state_single_item(self.session, share_item, new_state) + + if share_item_status == ShareItemStatus.Share_Approved.value: + self.handle_share_failure(table=table, error=error) + if share_item_status == ShareItemStatus.Revoke_Approved.value: + self.handle_revoke_failure(table=table, error=error) + + return True + + def _verify_catalog_ownership(self, catalog_account_id, catalog_region, catalog_database): + """ + Verifies the catalog ownership by checking + 1. if the pivot role is assumable in the catalog account + 2. if "owner_account_id" tag is present in the catalog account, which contains AWS account of source account / producer account - where the data is present in S3 bucket + Returns : Raises exception only in case there is an issue with any of above or returns True + """ + logger.info(f'Database {self.dataset.GlueDatabaseName} is a resource link and ' + f'the source database {catalog_database} belongs to a catalog account {catalog_account_id}') + if SessionHelper.is_assumable_pivot_role(catalog_account_id): + self._validate_catalog_ownership_tag(catalog_account_id, catalog_region, catalog_database) + else: + raise Exception(f'Pivot role is not assumable, catalog account {catalog_account_id} is not onboarded') + + return True + + def _validate_catalog_ownership_tag(self, catalog_account_id, catalog_region, catalog_database): + glue_client = GlueClient(account_id=catalog_account_id, + database=catalog_database, + region=catalog_region) + + tags = glue_client.get_database_tags() + if tags.get('owner_account_id', '') == self.source_environment.AwsAccountId: + logger.info( + f'owner_account_id tag exists and matches the source account id {self.source_environment.AwsAccountId}') + else: + raise Exception( + f'owner_account_id tag does not exist or does not matches the source account id {self.source_environment.AwsAccountId}') + + def check_catalog_account_exists_and_verify(self): + """ + Checks if the source account has a catalog associated with it. This is checked by getting source catalog information and checking if there exists a target database for the source db + Return - + True - if a catalog account is present and it is verified + False - if no source catalog account is present + None - if catalog account exists but there is an issue with verifing the conditions needed for source account. Check _verify_catalog_ownership for more details + """ + try: + catalog_dict = GlueClient( + account_id=self.source_environment.AwsAccountId, + region=self.source_environment.region, + database=self.dataset.GlueDatabaseName, + ).get_source_catalog() + if catalog_dict is not None and catalog_dict.get('account_id') != self.source_environment.AwsAccountId: + # Verify the ownership of dataset by checking if pivot role is assumable and ownership tag is present + self._verify_catalog_ownership(catalog_dict.get('account_id'), catalog_dict.get('region'), + catalog_dict.get('database_name')) + else: + logger.info( + f'No Catalog information found for dataset - {self.dataset.name} containing database - {self.dataset.GlueDatabaseName}') + return False + except Exception as e: + logger.error( + f'Failed to initialise catalog account details for share - {self.share.shareUri} ' + f'due to: {e}' + ) + return None + return True + + def get_catalog_account_details(self): + """ + Fetched the catalog details and returns a dict containing information about the catalog account + Returns : + 'account_id' - AWS account id of catalog account + 'region' - AWS region in which the catalog account is present + 'database_name' - DB present in the catalog account + """ + try: + catalog_dict = GlueClient( + account_id=self.source_environment.AwsAccountId, + region=self.source_environment.region, + database=self.dataset.GlueDatabaseName, + ).get_source_catalog() + return catalog_dict.get('account_id'), catalog_dict.get('region'), catalog_dict.get('database_name') + except Exception as e: + logger.error( + f'Failed to fetch catalog account details for share - {self.share.shareUri} ' + f'due to: {e}' + ) + return None, None, None + + def initialize_clients(self): + + self.lf_client_in_target = LakeFormationClient( + account_id=self.target_environment.AwsAccountId, + region=self.target_environment.region + ) + self.lf_client_in_source = LakeFormationClient( + account_id=self.source_account_id, + region=self.source_account_region + ) + self.glue_client_in_target = GlueClient( + account_id=self.target_environment.AwsAccountId, + region=self.target_environment.region, + database=self.shared_db_name, + ) + self.glue_client_in_source = GlueClient( + account_id=self.source_account_id, + region=self.source_account_region, + database=self.source_database_name, + ) diff --git a/backend/dataall/modules/dataset_sharing/services/share_processors/lakeformation_process_share.py b/backend/dataall/modules/dataset_sharing/services/share_processors/lakeformation_process_share.py index f60a57e02..2037f66e9 100644 --- a/backend/dataall/modules/dataset_sharing/services/share_processors/lakeformation_process_share.py +++ b/backend/dataall/modules/dataset_sharing/services/share_processors/lakeformation_process_share.py @@ -36,6 +36,7 @@ def __init__( def process_approved_shares(self) -> bool: """ + 0) Check if source account details are properly initialized and initialize the Glue and LF clients 1) Grant ALL permissions to pivotRole for source database in source account 2) Create the shared database in target account if it doesn't exist 3) Grant permissions to pivotRole and principals to "shared" database @@ -64,10 +65,20 @@ def process_approved_shares(self) -> bool: if not self.shared_tables: log.info("No tables to share. Skipping...") else: - self.grant_pivot_role_all_database_permissions_to_source_database() - self.check_if_exists_and_create_shared_database_in_target() - self.grant_pivot_role_all_database_permissions_to_shared_database() - self.grant_principals_database_permissions_to_shared_database() + try: + if None in [self.source_account_id, self.source_account_region, self.source_database_name]: + raise Exception( + 'Source account details not initialized properly. Please check if the catalog account is properly onboarded on data.all') + self.initialize_clients() + self.grant_pivot_role_all_database_permissions_to_source_database() + self.check_if_exists_and_create_shared_database_in_target() + self.grant_pivot_role_all_database_permissions_to_shared_database() + self.grant_principals_database_permissions_to_shared_database() + except Exception as e: + log.error(f"Failed to process approved tables due to {e}") + self.handle_share_failure_for_all_tables(tables=self.shared_tables, error=e, + share_item_status=ShareItemStatus.Share_Approved.value) + return False for table in self.shared_tables: log.info(f"Sharing table {table.GlueTableName}...") @@ -97,24 +108,20 @@ def process_approved_shares(self) -> bool: ( retry_share_table, failed_invitations, - ) = RamClient.accept_ram_invitation( - source_account_id=self.source_environment.AwsAccountId, - source_region=self.source_environment.region, - target_account_id=self.target_environment.AwsAccountId, - target_region=self.target_environment.region, - source_database=self.dataset.GlueDatabaseName, - source_table=table - ) + ) = RamClient.accept_ram_invitation(source_account_id=self.source_account_id, + source_region=self.source_account_region, + source_database=self.source_database_name, + source_table=table, + target_account_id=self.target_environment.AwsAccountId, + target_region=self.target_environment.region) if retry_share_table: self.grant_target_account_permissions_to_source_table(table) - RamClient.accept_ram_invitation( - source_account_id=self.source_environment.AwsAccountId, - source_region=self.source_environment.region, - target_account_id=self.target_environment.AwsAccountId, - target_region=self.target_environment.region, - source_database=self.dataset.GlueDatabaseName, - source_table=table - ) + RamClient.accept_ram_invitation(source_account_id=self.source_account_id, + source_region=self.source_account_region, + source_database=self.source_database_name, + source_table=table, + target_account_id=self.target_environment.AwsAccountId, + target_region=self.target_environment.region) self.check_if_exists_and_create_resource_link_table_in_shared_database(table) self.grant_principals_permissions_to_table_in_target(table) self.grant_principals_permissions_to_resource_link_table(table) @@ -133,7 +140,9 @@ def process_approved_shares(self) -> bool: def process_revoked_shares(self) -> bool: """ - 1) For each revoked table: + 0) Check if source account details are properly initialized and initialize the Glue and LF clients + 1) Grant Pivot Role all database permissions to the shared database + 2) For each revoked table: a) Update its status to REVOKE_IN_PROGRESS with Action Start b) Check if table exists on glue catalog raise error if not and flag share item status to failed c) Check if resource link table exists in target account @@ -143,8 +152,8 @@ def process_revoked_shares(self) -> bool: g) If c is True and (old-share or (new-share and d is True, no other shares of this table)) then delete resource link table g) If d is True (no other shares of this table with target), revoke permissions to target account to the original table h) update share item status to REVOKE_SUCCESSFUL with Action Success - 2) Check if there are existing_shared_tables for this dataset with target environment - 3) If no existing_shared_tables, delete shared database + 3) Check if there are existing_shared_tables for this dataset with target environment + 4) If no existing_shared_tables, delete shared database Returns ------- @@ -155,7 +164,18 @@ def process_revoked_shares(self) -> bool: '##### Starting Revoking tables #######' ) success = True - self.grant_pivot_role_all_database_permissions_to_shared_database() + try: + if None in [self.source_account_id, self.source_account_region, self.source_database_name]: + raise Exception( + 'Source account details not initialized properly. Please check if the catalog account is properly onboarded on data.all') + self.initialize_clients() + self.grant_pivot_role_all_database_permissions_to_shared_database() + except Exception as e: + log.error(f"Failed to process revoked tables due to {e}") + self.handle_share_failure_for_all_tables(tables=self.revoked_tables, error=e, + share_item_status=ShareItemStatus.Revoke_Approved.value) + return False + for table in self.revoked_tables: share_item = ShareObjectRepository.find_sharable_item( self.session, self.share.shareUri, table.tableUri diff --git a/tests/modules/datasets/tasks/test_lf_share_manager.py b/tests/modules/datasets/tasks/test_lf_share_manager.py index cb7c48de6..a1529c525 100644 --- a/tests/modules/datasets/tasks/test_lf_share_manager.py +++ b/tests/modules/datasets/tasks/test_lf_share_manager.py @@ -118,6 +118,8 @@ def processor_with_mocks(db, dataset1, share, table1, table2, source_environment ) mock_glue_client().get_glue_database.return_value = False + mock_glue_client().get_source_catalog.return_value = None + with db.scoped_session() as session: processor = ProcessLakeFormationShare( session, @@ -149,7 +151,13 @@ def processor_with_mocks(db, dataset1, share, table1, table2, source_environment glue_mock_client ) - yield processor, lf_mock_client, glue_mock_client + mocker.patch.object( + processor, + "glue_client_in_source", + glue_mock_client + ) + + yield processor, lf_mock_client, glue_mock_client, mock_glue_client @@ -163,7 +171,7 @@ def mock_glue_client(mocker): yield mock_client def test_init(processor_with_mocks): - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks assert processor.dataset assert processor.share assert processor.shared_db_name == 'gluedatabase_shared' @@ -176,7 +184,7 @@ def test_get_share_principals( share: ShareObject, ): # Given a dataset and its share, build db_share name - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks get_iam_role_arn_mock = mocker.patch( "dataall.base.aws.iam.IAM.get_role_arn_by_name", side_effect = lambda account_id, role_name : f"arn:aws:iam::{account_id}:role/{role_name}" @@ -193,7 +201,7 @@ def test_build_shared_db_name( mock_glue_client ): # Given a new share, build db_share name - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks mock_glue_client().get_glue_database.return_value = False # Reset to remove call in __init__ mock_glue_client().get_glue_database.reset_mock() @@ -208,7 +216,7 @@ def test_build_shared_db_name_old( mock_glue_client ): # Given an existing old share (shared db name with shareUri), build db_share name - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks mock_glue_client().get_glue_database.return_value = True # Reset to remove call in __init__ mock_glue_client().get_glue_database.reset_mock() @@ -222,16 +230,16 @@ def test_check_table_exists_in_source_database( share_item: ShareObjectItem, mock_glue_client ): - processor, lf_client, glue_client = processor_with_mocks - mock_glue_client().table_exists.return_value = True + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks + glue_client.table_exists.return_value = True # When processor.check_table_exists_in_source_database( share_item=share_item, table=table1 ) # Then - mock_glue_client().table_exists.assert_called_once() - mock_glue_client().table_exists.assert_called_with(table1.GlueTableName) + glue_client.table_exists.assert_called_once() + glue_client.table_exists.assert_called_with(table1.GlueTableName) def test_check_table_exists_in_source_database_exception( processor_with_mocks, @@ -239,8 +247,8 @@ def test_check_table_exists_in_source_database_exception( share_item: ShareObjectItem, mock_glue_client ): - processor, lf_client, glue_client = processor_with_mocks - mock_glue_client().table_exists.return_value = False + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks + glue_client.table_exists.return_value = False # When with pytest.raises(Exception) as exception: processor.check_table_exists_in_source_database( @@ -248,42 +256,40 @@ def test_check_table_exists_in_source_database_exception( table=table1 ) #Then - mock_glue_client().table_exists.assert_called_once() - mock_glue_client().table_exists.assert_called_with(table1.GlueTableName) + glue_client.table_exists.assert_called_once() + glue_client.table_exists.assert_called_with(table1.GlueTableName) assert "ExceptionInfo" in str(exception) def test_check_resource_link_table_exists_in_target_database_true( processor_with_mocks, - table1: DatasetTable, - mock_glue_client + table1: DatasetTable ): - processor, lf_client, glue_client = processor_with_mocks - mock_glue_client().table_exists.return_value = True + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks + glue_client.table_exists.return_value = True # When response = processor.check_resource_link_table_exists_in_target_database(table=table1) # Then assert response == True - mock_glue_client().table_exists.assert_called_once() - mock_glue_client().table_exists.assert_called_with(table1.GlueTableName) + glue_client.table_exists.assert_called_once() + glue_client.table_exists.assert_called_with(table1.GlueTableName) def test_check_resource_link_table_exists_in_target_database_false( processor_with_mocks, table1: DatasetTable, - mock_glue_client ): - processor, lf_client, glue_client = processor_with_mocks - mock_glue_client().table_exists.return_value = False + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks + glue_client.table_exists.return_value = False # Then assert processor.check_resource_link_table_exists_in_target_database(table=table1) == False - mock_glue_client().table_exists.assert_called_once() - mock_glue_client().table_exists.assert_called_with(table1.GlueTableName) + glue_client.table_exists.assert_called_once() + glue_client.table_exists.assert_called_with(table1.GlueTableName) def test_revoke_iam_allowed_principals_from_table( processor_with_mocks, table1: DatasetTable, source_environment ): - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks lf_client.revoke_permissions_from_table.return_value = True # Then assert processor.revoke_iam_allowed_principals_from_table(table1) == True @@ -302,7 +308,7 @@ def test_grant_pivot_role_all_database_permissions_to_source_database( source_environment: Environment, mocker ): - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks lf_client.grant_permissions_to_database.return_value = True mocker.patch( "dataall.base.aws.sts.SessionHelper.get_delegation_role_arn", @@ -322,7 +328,7 @@ def test_check_if_exists_and_create_shared_database_in_target( processor_with_mocks, dataset1: Dataset ): - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks glue_client.create_database.return_value = True # When processor.check_if_exists_and_create_shared_database_in_target() @@ -336,7 +342,7 @@ def test_grant_pivot_role_all_database_permissions_to_shared_database( dataset1: Dataset, mocker ): - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks mocker.patch( "dataall.base.aws.sts.SessionHelper.get_delegation_role_arn", return_value="arn:role", @@ -356,7 +362,7 @@ def test_grant_principals_database_permissions_to_shared_database( processor_with_mocks, dataset1: Dataset ): - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks # When processor.grant_principals_database_permissions_to_shared_database() # Then @@ -373,7 +379,7 @@ def test_grant_target_account_permissions_to_source_table( source_environment: Environment, table1: DatasetTable ): - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks # When processor.grant_target_account_permissions_to_source_table(table1) # Then @@ -391,39 +397,38 @@ def test_grant_target_account_permissions_to_source_table( def test_check_if_exists_and_create_resource_link_table_in_shared_database_false( processor_with_mocks, table1: DatasetTable, - source_environment: Environment, - mock_glue_client + source_environment: Environment ): - processor, lf_client, glue_client = processor_with_mocks - mock_glue_client().table_exists.return_value = False + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks + glue_client.table_exists.return_value = False glue_client.create_resource_link.return_value = True # When processor.check_if_exists_and_create_resource_link_table_in_shared_database(table1) # Then - mock_glue_client().table_exists.assert_called_once() + glue_client.table_exists.assert_called_once() glue_client.create_resource_link.assert_called_once() glue_client.create_resource_link.assert_called_with( resource_link_name=table1.GlueTableName, + database=table1.GlueDatabaseName, table=table1, catalog_id=source_environment.AwsAccountId ) def test_check_if_exists_and_create_resource_link_table_in_shared_database_true( processor_with_mocks, - table1: DatasetTable, - mock_glue_client + table1: DatasetTable ): - processor, lf_client, glue_client = processor_with_mocks - mock_glue_client().table_exists.return_value = True + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks + glue_client.table_exists.return_value = True glue_client.create_resource_link.return_value = True # When processor.check_if_exists_and_create_resource_link_table_in_shared_database(table1) # Then - mock_glue_client().table_exists.assert_called_once() + glue_client.table_exists.assert_called_once() glue_client.create_resource_link.assert_not_called() @@ -432,7 +437,7 @@ def test_grant_principals_permissions_to_resource_link_table( table1: DatasetTable, target_environment: Environment ): - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks # When processor.grant_principals_permissions_to_resource_link_table(table1) # Then @@ -452,7 +457,7 @@ def test_grant_pivot_role_drop_permissions_to_resource_link_table( target_environment: Environment, mocker ): - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks mocker.patch( "dataall.base.aws.sts.SessionHelper.get_delegation_role_arn", return_value="arn:role", @@ -475,7 +480,7 @@ def test_grant_principals_permissions_to_table_in_target( table1: DatasetTable, source_environment: Environment ): - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks # When processor.grant_principals_permissions_to_table_in_target(table1) # Then @@ -493,7 +498,7 @@ def test_revoke_principals_permissions_to_resource_link_table( table1: DatasetTable, target_environment:Environment ): - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks # When processor.revoke_principals_permissions_to_resource_link_table(table=table1) # Then @@ -512,7 +517,7 @@ def test_revoke_principals_permissions_to_table_in_target( table1: DatasetTable, source_environment: Environment ): - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks # When processor.revoke_principals_permissions_to_table_in_target(table = table1, other_table_shares_in_env=False) # Then @@ -529,7 +534,7 @@ def test_delete_resource_link_table_in_shared_database_true( processor_with_mocks, table2: DatasetTable ): - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks # When processor.delete_resource_link_table_in_shared_database( @@ -543,7 +548,7 @@ def test_revoke_principals_database_permissions_to_shared_database( processor_with_mocks, dataset1: Dataset ): - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks # When processor.revoke_principals_database_permissions_to_shared_database() # Then @@ -557,7 +562,7 @@ def test_revoke_principals_database_permissions_to_shared_database( def test_delete_shared_database_in_target( processor_with_mocks, ): - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks # When processor.delete_shared_database_in_target() # Then @@ -570,7 +575,7 @@ def test_revoke_external_account_access_on_source_account( source_environment: Environment, target_environment: Environment ): - processor, lf_client, glue_client = processor_with_mocks + processor, lf_client, glue_client , mock_glue_client = processor_with_mocks # When processor.revoke_external_account_access_on_source_account(table1) # Then @@ -584,6 +589,126 @@ def test_revoke_external_account_access_on_source_account( permissions_with_grant_options=['DESCRIBE', 'SELECT'] ) +def test_check_catalog_account_exists_and_update_processor_with_catalog_exists( + db, dataset1, share, table1, table2, source_environment, target_environment, + target_environment_group, mock_glue_client, + mocker +): + # Mock glue and sts calls to create a LF processor + mocker.patch( + "dataall.base.aws.sts.SessionHelper.remote_session", + return_value=boto3.Session(), + ) + mocker.patch( + "dataall.base.aws.iam.IAM.get_role_arn_by_name", + side_effect=lambda account_id, role_name: f"arn:aws:iam::{account_id}:role/{role_name}" + ) + mock_glue_client().get_glue_database.return_value = False + + mock_glue_client().get_source_catalog.return_value = {'account_id' : '12129101212', 'database_name' : 'catalog_db', 'region' : 'us-east-1'} + + mock_glue_client().get_database_tags.return_value = {'owner_account_id': source_environment.AwsAccountId} + + mocker.patch("dataall.base.aws.sts.SessionHelper.is_assumable_pivot_role", return_value=True) + + # when + with db.scoped_session() as session: + processor = ProcessLakeFormationShare( + session, + dataset1, + share, + [table1], + [table2], + source_environment, + target_environment, + target_environment_group, + ) + + # Check if the catalog account exists + assert processor.check_catalog_account_exists_and_verify() == True + # Then + # Check the source account id. source account database and region to check if it is updated + assert processor.source_account_id == '12129101212' + assert processor.source_database_name == 'catalog_db' + assert processor.source_account_region == 'us-east-1' + + # Check the shared database + assert processor.shared_db_name == 'catalog_db' + '_shared' + +def test_check_catalog_account_exists_and_update_processor_with_catalog_exists_and_pivot_role_not_assumable( + processor_with_mocks, + table1: DatasetTable, + source_environment: Environment, + target_environment: Environment, + mocker +): + # Given + processor, lf_client, glue_client, mock_glue_client = processor_with_mocks + + # Override the mocks to check catalog account details + mock_glue_client().get_source_catalog.return_value = {'account_id' : '12129101212', 'database_name' : 'catalog_db', 'region' : 'us-east-1'} + + mock_glue_client().get_database_tags.return_value = {'owner_account_id' : source_environment.AwsAccountId} + + mock_glue_client().get_glue_database.return_value = False + + mocker.patch("dataall.base.aws.sts.SessionHelper.is_assumable_pivot_role", return_value=False) + + # When + with pytest.raises(Exception) as exception: + processor._verify_catalog_ownership('12129101212', 'us-east-1', 'catalog_db') + + # Then + assert "Pivot role is not assumable" in str(exception) + assert processor.check_catalog_account_exists_and_verify() is None + +def test_check_catalog_account_exists_and_update_processor_with_catalog_exists_and_tag_doesnt_exists( + processor_with_mocks, + table1: DatasetTable, + source_environment: Environment, + target_environment: Environment, + mocker +): + # Given + processor, lf_client, glue_client, mock_glue_client = processor_with_mocks + + # Override the mocks to check catalog account details + mock_glue_client().get_source_catalog.return_value = {'account_id': '12129101212', 'database_name': 'catalog_db', + 'region': 'us-east-1'} + + mock_glue_client().get_database_tags.return_value = {'owner_account_id' : 'NotTheSourceAccountID'} + + mock_glue_client().get_glue_database.return_value = False + + mocker.patch("dataall.base.aws.sts.SessionHelper.is_assumable_pivot_role", return_value=True) + + # when + with pytest.raises(Exception) as exception: + processor._validate_catalog_ownership_tag('12129101212', 'us-east-1', 'catalog_db') + + # then + assert processor.check_catalog_account_exists_and_verify() is None + assert "owner_account_id tag does not exist or does not matches the source account id" in str(exception) + +def test_check_catalog_account_exists_and_update_processor_with_catalog_doesnt_exists( + processor_with_mocks, + table1: DatasetTable, + source_environment: Environment, + target_environment: Environment +): + processor, lf_client, glue_client, mock_glue_client = processor_with_mocks + + # When + assert processor.check_catalog_account_exists_and_verify() == False + + # Then + # Check the source account id. source account database and region to check if it is updated + assert processor.source_account_id == source_environment.AwsAccountId + assert processor.source_database_name == processor.dataset.GlueDatabaseName + assert processor.source_account_region == source_environment.region + + # Check the shared database + assert processor.shared_db_name == processor.dataset.GlueDatabaseName + "_shared" def test_handle_share_failure( processor_with_mocks, @@ -592,8 +717,8 @@ def test_handle_share_failure( ): # Given alarm_service_mock = mocker.patch.object(DatasetAlarmService, "trigger_table_sharing_failure_alarm") - error = Exception - processor, lf_client, glue_client = processor_with_mocks + error = Exception() + processor, lf_client, glue_client, mock_glue_client = processor_with_mocks # When processor.handle_share_failure(table1, error) @@ -611,8 +736,8 @@ def test_handle_revoke_failure( # Given alarm_service_mock = mocker.patch.object(DatasetAlarmService, "trigger_revoke_table_sharing_failure_alarm") - error = Exception - processor, lf_client, glue_client = processor_with_mocks + error = Exception() + processor, lf_client, glue_client, mock_glue_client = processor_with_mocks # When processor.handle_revoke_failure(table1, error)