From 7aef3ad236cb297c8e42e516a763719584e0e03a Mon Sep 17 00:00:00 2001 From: Anushka Singh Date: Wed, 28 Feb 2024 10:06:51 -0500 Subject: [PATCH] Feature 941 (#1072) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Feature or Bugfix - Feature ### Detail Share requests running in parallel override one another. ### Relates - https://github.com/data-dot-all/dataall/issues/941 ### Testing - 2 simultaneous shares got processed successfully with locking mechanism. - Here is what the dataset_lock DB looks like and the values in it. Screenshot 2024-02-14 at 5 12 54 PM - acquiredBy column show the share which last acquired the lock for the particular dataset ### Security Please answer the questions below briefly where applicable, or write `N/A`. Based on [OWASP 10](https://owasp.org/Top10/en/). - Does this PR introduce or modify any input fields or queries - this includes fetching data from storage outside the application (e.g. a database, an S3 bucket)? - Is the input sanitized? - What precautions are you taking before deserializing the data you consume? - Is injection prevented by parametrizing queries? - Have you ensured no `eval` or similar functions are used? - Does this PR introduce any functionality or component that requires authorization? - How have you ensured it respects the existing AuthN/AuthZ mechanisms? - Are you logging failed auth attempts? - Are you using or adding any cryptographic features? - Do you use a standard proven implementations? - Are the used keys controlled by the customer? Where are they stored? - Are you introducing any new policies/roles/users? - Have you used the least-privilege principle? How? By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. --------- Signed-off-by: dependabot[bot] Co-authored-by: Noah Paige <69586985+noah-paige@users.noreply.github.com> Co-authored-by: dlpzx <71252798+dlpzx@users.noreply.github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: jaidisido Co-authored-by: dlpzx Co-authored-by: mourya-33 <134511711+mourya-33@users.noreply.github.com> Co-authored-by: nikpodsh <124577300+nikpodsh@users.noreply.github.com> Co-authored-by: MK Co-authored-by: Manjula Co-authored-by: Zilvinas Saltys Co-authored-by: Zilvinas Saltys Co-authored-by: Daniel Lorch <98748454+lorchda@users.noreply.github.com> Co-authored-by: Anushka Singh Co-authored-by: Tejas Rajopadhye <71188245+TejasRGitHub@users.noreply.github.com> Co-authored-by: trajopadhye --- .../db/share_object_repositories.py | 16 + .../services/data_sharing_service.py | 512 +++++++++++++----- .../services/dataset_sharing_enums.py | 1 + .../datasets_base/db/dataset_models.py | 12 + ...e29_implement_dataset_locking_mechanism.py | 157 ++++++ 5 files changed, 554 insertions(+), 144 deletions(-) create mode 100644 backend/migrations/versions/a1049d854e29_implement_dataset_locking_mechanism.py diff --git a/backend/dataall/modules/dataset_sharing/db/share_object_repositories.py b/backend/dataall/modules/dataset_sharing/db/share_object_repositories.py index 44e637cd5..c1cb05d28 100644 --- a/backend/dataall/modules/dataset_sharing/db/share_object_repositories.py +++ b/backend/dataall/modules/dataset_sharing/db/share_object_repositories.py @@ -138,6 +138,15 @@ def __init__(self, state): ] } ), + ShareObjectActions.AcquireLockFailure.value: Transition( + name=ShareObjectActions.AcquireLockFailure.value, + transitions={ + ShareObjectStatus.Processed.value: [ + ShareObjectStatus.Share_In_Progress.value, + ShareObjectStatus.Revoke_In_Progress.value + ] + } + ), } def run_transition(self, transition): @@ -258,6 +267,13 @@ def __init__(self, state): ShareItemStatus.Revoke_Succeeded.value ] } + ), + ShareObjectActions.AcquireLockFailure.value: Transition( + name=ShareObjectActions.AcquireLockFailure.value, + transitions={ + ShareItemStatus.Share_Failed.value: [ShareItemStatus.Share_Approved.value], + ShareItemStatus.Revoke_Failed.value: [ShareItemStatus.Revoke_Approved.value], + } ) } diff --git a/backend/dataall/modules/dataset_sharing/services/data_sharing_service.py b/backend/dataall/modules/dataset_sharing/services/data_sharing_service.py index 1fb3b449e..595291ab0 100644 --- a/backend/dataall/modules/dataset_sharing/services/data_sharing_service.py +++ b/backend/dataall/modules/dataset_sharing/services/data_sharing_service.py @@ -1,5 +1,8 @@ import logging +from sqlalchemy import and_ +from time import sleep + from dataall.base.db import Engine from dataall.modules.dataset_sharing.db.share_object_repositories import ShareObjectSM, ShareObjectRepository, \ ShareItemSM @@ -10,9 +13,14 @@ from dataall.modules.dataset_sharing.services.share_processors.s3_bucket_process_share import ProcessS3BucketShare from dataall.modules.dataset_sharing.services.dataset_sharing_enums import (ShareObjectActions, ShareItemStatus, ShareableType) +from dataall.modules.datasets_base.db.dataset_models import DatasetLock + log = logging.getLogger(__name__) +MAX_RETRIES = 10 +RETRY_INTERVAL = 60 + class DataSharingService: def __init__(self): @@ -38,71 +46,118 @@ def approve_share(cls, engine: Engine, share_uri: str) -> bool: True if sharing succeeds, False if folder or table sharing failed """ - with engine.scoped_session() as session: - ( + try: + with engine.scoped_session() as session: + ( + source_env_group, + env_group, + dataset, + share, + source_environment, + target_environment, + ) = ShareObjectRepository.get_share_data(session, share_uri) + + share_sm = ShareObjectSM(share.status) + new_share_state = share_sm.run_transition(ShareObjectActions.Start.value) + share_sm.update_state(session, share, new_share_state) + + ( + shared_tables, + shared_folders, + shared_buckets + ) = ShareObjectRepository.get_share_data_items(session, share_uri, ShareItemStatus.Share_Approved.value) + + lock_acquired = cls.acquire_lock_with_retry(dataset.datasetUri, session, share.shareUri) + + if not lock_acquired: + log.error(f"Failed to acquire lock for dataset {dataset.datasetUri}. Exiting...") + for table in shared_tables: + share_item = ShareObjectRepository.find_sharable_item( + session, share_uri, table.tableUri + ) + cls.handle_share_items_failure_during_locking( + session, share_item, ShareItemStatus.Share_Approved.value + ) + + for folder in shared_folders: + share_item = ShareObjectRepository.find_sharable_item( + session, + share_uri, + folder.locationUri, + ) + cls.handle_share_items_failure_during_locking( + session, share_item, ShareItemStatus.Share_Approved.value + ) + + for bucket in shared_buckets: + share_item = ShareObjectRepository.find_sharable_item( + session, + share_uri, + bucket.bucketUri, + ) + cls.handle_share_items_failure_during_locking( + session, share_item, ShareItemStatus.Share_Approved.value + ) + + share_object_SM = ShareObjectSM(share.status) + new_object_state = share_object_SM.run_transition(ShareObjectActions.AcquireLockFailure.value) + share_object_SM.update_state(session, share, new_object_state) + return False + + log.info(f'Granting permissions to folders: {shared_folders}') + + approved_folders_succeed = ProcessS3AccessPointShare.process_approved_shares( + session, + dataset, + share, + shared_folders, + source_environment, + target_environment, source_env_group, - env_group, + env_group + ) + log.info(f'sharing folders succeeded = {approved_folders_succeed}') + + log.info('Granting permissions to S3 buckets') + + approved_s3_buckets_succeed = ProcessS3BucketShare.process_approved_shares( + session, dataset, share, + shared_buckets, source_environment, target_environment, - ) = ShareObjectRepository.get_share_data(session, share_uri) + source_env_group, + env_group + ) + log.info(f'sharing s3 buckets succeeded = {approved_s3_buckets_succeed}') - share_sm = ShareObjectSM(share.status) - new_share_state = share_sm.run_transition(ShareObjectActions.Start.value) + log.info(f'Granting permissions to tables: {shared_tables}') + approved_tables_succeed = ProcessLakeFormationShare( + session, + dataset, + share, + shared_tables, + [], + source_environment, + target_environment, + env_group, + ).process_approved_shares() + log.info(f'sharing tables succeeded = {approved_tables_succeed}') + + new_share_state = share_sm.run_transition(ShareObjectActions.Finish.value) share_sm.update_state(session, share, new_share_state) - ( - shared_tables, - shared_folders, - shared_buckets - ) = ShareObjectRepository.get_share_data_items(session, share_uri, ShareItemStatus.Share_Approved.value) - - log.info(f'Granting permissions to folders: {shared_folders}') - - approved_folders_succeed = ProcessS3AccessPointShare.process_approved_shares( - session, - dataset, - share, - shared_folders, - source_environment, - target_environment, - source_env_group, - env_group - ) - log.info(f'sharing folders succeeded = {approved_folders_succeed}') - - log.info('Granting permissions to S3 buckets') - - approved_s3_buckets_succeed = ProcessS3BucketShare.process_approved_shares( - session, - dataset, - share, - shared_buckets, - source_environment, - target_environment, - source_env_group, - env_group - ) - log.info(f'sharing s3 buckets succeeded = {approved_s3_buckets_succeed}') - - log.info(f'Granting permissions to tables: {shared_tables}') - approved_tables_succeed = ProcessLakeFormationShare( - session, - dataset, - share, - shared_tables, - [], - source_environment, - target_environment, - env_group, - ).process_approved_shares() - log.info(f'sharing tables succeeded = {approved_tables_succeed}') - - new_share_state = share_sm.run_transition(ShareObjectActions.Finish.value) - share_sm.update_state(session, share, new_share_state) - - return approved_folders_succeed and approved_s3_buckets_succeed and approved_tables_succeed + return approved_folders_succeed and approved_s3_buckets_succeed and approved_tables_succeed + + except Exception as e: + log.error(f"Error occurred during share approval: {e}") + return False + + finally: + lock_released = cls.release_lock(dataset.datasetUri, session, share.shareUri) + if not lock_released: + log.error(f"Failed to release lock for dataset {dataset.datasetUri}.") @classmethod def revoke_share(cls, engine: Engine, share_uri: str): @@ -126,103 +181,272 @@ def revoke_share(cls, engine: Engine, share_uri: str): True if revoke succeeds False if folder or table revoking failed """ + try: + with engine.scoped_session() as session: + ( + source_env_group, + env_group, + dataset, + share, + source_environment, + target_environment, + ) = ShareObjectRepository.get_share_data(session, share_uri) + + share_sm = ShareObjectSM(share.status) + new_share_state = share_sm.run_transition(ShareObjectActions.Start.value) + share_sm.update_state(session, share, new_share_state) + + revoked_item_sm = ShareItemSM(ShareItemStatus.Revoke_Approved.value) + + ( + revoked_tables, + revoked_folders, + revoked_buckets + ) = ShareObjectRepository.get_share_data_items(session, share_uri, ShareItemStatus.Revoke_Approved.value) + + lock_acquired = cls.acquire_lock_with_retry(dataset.datasetUri, session, share.shareUri) + + if not lock_acquired: + log.error(f"Failed to acquire lock for dataset {dataset.datasetUri}. Exiting...") + for table in revoked_tables: + share_item = ShareObjectRepository.find_sharable_item( + session, share_uri, table.tableUri + ) + cls.handle_share_items_failure_during_locking( + session, share_item, ShareItemStatus.Revoke_Approved.value + ) + + for folder in revoked_folders: + share_item = ShareObjectRepository.find_sharable_item( + session, + share_uri, + folder.locationUri, + ) + cls.handle_share_items_failure_during_locking( + session, share_item, ShareItemStatus.Revoke_Approved.value + ) + + for bucket in revoked_buckets: + share_item = ShareObjectRepository.find_sharable_item( + session, + share_uri, + bucket.bucketUri, + ) + cls.handle_share_items_failure_during_locking( + session, share_item, ShareItemStatus.Revoke_Approved.value + ) + + share_object_SM = ShareObjectSM(share.status) + new_object_state = share_object_SM.run_transition(ShareObjectActions.AcquireLockFailure.value) + share_object_SM.update_state(session, share, new_object_state) + return False + + new_state = revoked_item_sm.run_transition(ShareObjectActions.Start.value) + revoked_item_sm.update_state(session, share_uri, new_state) + + log.info(f'Revoking permissions to folders: {revoked_folders}') + + revoked_folders_succeed = ProcessS3AccessPointShare.process_revoked_shares( + session, + dataset, + share, + revoked_folders, + source_environment, + target_environment, + source_env_group, + env_group, + ) + log.info(f'revoking folders succeeded = {revoked_folders_succeed}') + existing_shared_folders = ShareObjectRepository.check_existing_shared_items_of_type( + session, + share_uri, + ShareableType.StorageLocation.value + ) + existing_shared_buckets = ShareObjectRepository.check_existing_shared_items_of_type( + session, + share_uri, + ShareableType.S3Bucket.value + ) + existing_shared_items = existing_shared_folders or existing_shared_buckets + log.info(f'Still remaining S3 resources shared = {existing_shared_items}') + if not existing_shared_folders and revoked_folders: + log.info("Clean up S3 access points...") + clean_up_folders = ProcessS3AccessPointShare.clean_up_share( + session, + dataset=dataset, + share=share, + folder=revoked_folders[0], + source_environment=source_environment, + target_environment=target_environment, + source_env_group=source_env_group, + env_group=env_group + ) + log.info(f"Clean up S3 successful = {clean_up_folders}") + + log.info('Revoking permissions to S3 buckets') + + revoked_s3_buckets_succeed = ProcessS3BucketShare.process_revoked_shares( + session, + dataset, + share, + revoked_buckets, + source_environment, + target_environment, + source_env_group, + env_group + ) + log.info(f'revoking s3 buckets succeeded = {revoked_s3_buckets_succeed}') - with engine.scoped_session() as session: - ( - source_env_group, - env_group, - dataset, - share, - source_environment, - target_environment, - ) = ShareObjectRepository.get_share_data(session, share_uri) + log.info(f'Revoking permissions to tables: {revoked_tables}') + revoked_tables_succeed = ProcessLakeFormationShare( + session, + dataset, + share, + [], + revoked_tables, + source_environment, + target_environment, + env_group, + ).process_revoked_shares() + log.info(f'revoking tables succeeded = {revoked_tables_succeed}') + + existing_pending_items = ShareObjectRepository.check_pending_share_items(session, share_uri) + if existing_pending_items: + new_share_state = share_sm.run_transition(ShareObjectActions.FinishPending.value) + else: + new_share_state = share_sm.run_transition(ShareObjectActions.Finish.value) + share_sm.update_state(session, share, new_share_state) + + return revoked_folders_succeed and revoked_s3_buckets_succeed and revoked_tables_succeed + + except Exception as e: + log.error(f"Error occurred during share revoking: {e}") + return False + + finally: + lock_released = cls.release_lock(dataset.datasetUri, session, share.shareUri) + if not lock_released: + log.error(f"Failed to release lock for dataset {dataset.datasetUri}.") + + @staticmethod + def acquire_lock(dataset_uri, session, share_uri): + """ + Attempts to acquire a lock on the dataset identified by dataset_id. - share_sm = ShareObjectSM(share.status) - new_share_state = share_sm.run_transition(ShareObjectActions.Start.value) - share_sm.update_state(session, share, new_share_state) + Args: + dataset_uri: The ID of the dataset for which the lock is being acquired. + session (sqlalchemy.orm.Session): The SQLAlchemy session object used for interacting with the database. + share_uri: The ID of the share that is attempting to acquire the lock. - revoked_item_sm = ShareItemSM(ShareItemStatus.Revoke_Approved.value) + Returns: + bool: True if the lock is successfully acquired, False otherwise. + """ + try: + # Execute the query to get the DatasetLock object + dataset_lock = ( + session.query(DatasetLock) + .filter( + and_( + DatasetLock.datasetUri == dataset_uri, + ~DatasetLock.isLocked + ) + ) + .with_for_update().first() + ) - ( - revoked_tables, - revoked_folders, - revoked_buckets - ) = ShareObjectRepository.get_share_data_items(session, share_uri, ShareItemStatus.Revoke_Approved.value) + # Check if dataset_lock is not None before attempting to update + if dataset_lock: + # Update the attributes of the DatasetLock object + dataset_lock.isLocked = True + dataset_lock.acquiredBy = share_uri - new_state = revoked_item_sm.run_transition(ShareObjectActions.Start.value) - revoked_item_sm.update_state(session, share_uri, new_state) + session.commit() + return True + else: + log.info("DatasetLock not found for the given criteria.") + return False + + except Exception as e: + session.expunge_all() + session.rollback() + log.error("Error occurred while acquiring lock:", e) + return False + + @staticmethod + def acquire_lock_with_retry(dataset_uri, session, share_uri): + for attempt in range(MAX_RETRIES): + try: + log.info(f"Attempting to acquire lock for dataset {dataset_uri} by share {share_uri}...") + lock_acquired = DataSharingService.acquire_lock(dataset_uri, session, share_uri) + if lock_acquired: + return True + + log.info( + f"Lock for dataset {dataset_uri} already acquired. Retrying in {RETRY_INTERVAL} seconds...") + sleep(RETRY_INTERVAL) + + except Exception as e: + log.error("Error occurred while retrying acquiring lock:", e) + return False + + log.info(f"Max retries reached. Failed to acquire lock for dataset {dataset_uri}") + return False + + @staticmethod + def release_lock(dataset_uri, session, share_uri): + """ + Releases the lock on the dataset identified by dataset_uri. - log.info(f'Revoking permissions to folders: {revoked_folders}') + Args: + dataset_uri: The ID of the dataset for which the lock is being released. + session (sqlalchemy.orm.Session): The SQLAlchemy session object used for interacting with the database. + share_uri: The ID of the share that is attempting to release the lock. - revoked_folders_succeed = ProcessS3AccessPointShare.process_revoked_shares( - session, - dataset, - share, - revoked_folders, - source_environment, - target_environment, - source_env_group, - env_group, - ) - log.info(f'revoking folders succeeded = {revoked_folders_succeed}') - existing_shared_folders = ShareObjectRepository.check_existing_shared_items_of_type( - session, - share_uri, - ShareableType.StorageLocation.value - ) - existing_shared_buckets = ShareObjectRepository.check_existing_shared_items_of_type( - session, - share_uri, - ShareableType.S3Bucket.value - ) - existing_shared_items = existing_shared_folders or existing_shared_buckets - log.info(f'Still remaining S3 resources shared = {existing_shared_items}') - if not existing_shared_folders and revoked_folders: - log.info("Clean up S3 access points...") - clean_up_folders = ProcessS3AccessPointShare.clean_up_share( - session, - dataset=dataset, - share=share, - folder=revoked_folders[0], - source_environment=source_environment, - target_environment=target_environment, - source_env_group=source_env_group, - env_group=env_group + Returns: + bool: True if the lock is successfully released, False otherwise. + """ + try: + log.info(f"Releasing lock for dataset: {dataset_uri} last acquired by share: {share_uri}") + dataset_lock = ( + session.query(DatasetLock) + .filter( + and_( + DatasetLock.datasetUri == dataset_uri, + DatasetLock.isLocked == True, + DatasetLock.acquiredBy == share_uri + ) ) - log.info(f"Clean up S3 successful = {clean_up_folders}") - - log.info('Revoking permissions to S3 buckets') - - revoked_s3_buckets_succeed = ProcessS3BucketShare.process_revoked_shares( - session, - dataset, - share, - revoked_buckets, - source_environment, - target_environment, - source_env_group, - env_group + .with_for_update().first() ) - log.info(f'revoking s3 buckets succeeded = {revoked_s3_buckets_succeed}') - log.info(f'Revoking permissions to tables: {revoked_tables}') - revoked_tables_succeed = ProcessLakeFormationShare( - session, - dataset, - share, - [], - revoked_tables, - source_environment, - target_environment, - env_group, - ).process_revoked_shares() - log.info(f'revoking tables succeeded = {revoked_tables_succeed}') + if dataset_lock: + dataset_lock.isLocked = False + dataset_lock.acquiredBy = '' - existing_pending_items = ShareObjectRepository.check_pending_share_items(session, share_uri) - if existing_pending_items: - new_share_state = share_sm.run_transition(ShareObjectActions.FinishPending.value) + session.commit() + return True else: - new_share_state = share_sm.run_transition(ShareObjectActions.Finish.value) - share_sm.update_state(session, share, new_share_state) + log.info("DatasetLock not found for the given criteria.") + return False - return revoked_folders_succeed and revoked_s3_buckets_succeed and revoked_tables_succeed + except Exception as e: + session.expunge_all() + session.rollback() + log.error("Error occurred while releasing lock:", e) + return False + + @staticmethod + def handle_share_items_failure_during_locking(session, share_item, share_item_status): + """ + If lock is not acquired successfully, mark the share items as failed. + + Args: + session (sqlalchemy.orm.Session): The SQLAlchemy session object used for interacting with the database. + share_item: The share item that needs to be marked failed during share. + + Returns: + None + """ + share_item_SM = ShareItemSM(share_item_status) + new_state = share_item_SM.run_transition(ShareObjectActions.AcquireLockFailure.value) + share_item_SM.update_state_single_item(session, share_item, new_state) diff --git a/backend/dataall/modules/dataset_sharing/services/dataset_sharing_enums.py b/backend/dataall/modules/dataset_sharing/services/dataset_sharing_enums.py index 9fb593f18..05a3afda2 100644 --- a/backend/dataall/modules/dataset_sharing/services/dataset_sharing_enums.py +++ b/backend/dataall/modules/dataset_sharing/services/dataset_sharing_enums.py @@ -51,6 +51,7 @@ class ShareObjectActions(GraphQLEnumMapper): Finish = 'Finish' FinishPending = 'FinishPending' Delete = 'Delete' + AcquireLockFailure = 'AcquireLockFailure' class ShareItemActions(GraphQLEnumMapper): diff --git a/backend/dataall/modules/datasets_base/db/dataset_models.py b/backend/dataall/modules/datasets_base/db/dataset_models.py index 73edbd235..393c2cc0a 100644 --- a/backend/dataall/modules/datasets_base/db/dataset_models.py +++ b/backend/dataall/modules/datasets_base/db/dataset_models.py @@ -164,3 +164,15 @@ class DatasetBucket(Resource, Base): @classmethod def uri(cls): return cls.bucketUri + + +class DatasetLock(Base): + __tablename__ = 'dataset_lock' + datasetUri = Column(String, ForeignKey("dataset.datasetUri"), nullable=False, primary_key=True) + isLocked = Column(Boolean, default=False) + acquiredBy = Column(String, nullable=True) + + def __init__(self, datasetUri, isLocked=False, acquiredBy=None): + self.datasetUri = datasetUri + self.isLocked = isLocked + self.acquiredBy = acquiredBy diff --git a/backend/migrations/versions/a1049d854e29_implement_dataset_locking_mechanism.py b/backend/migrations/versions/a1049d854e29_implement_dataset_locking_mechanism.py new file mode 100644 index 000000000..ee1d35111 --- /dev/null +++ b/backend/migrations/versions/a1049d854e29_implement_dataset_locking_mechanism.py @@ -0,0 +1,157 @@ +"""implement_resource_locking_mechanism + +Revision ID: a1049d854e29 +Revises: f6cd4ba7dd8d +Create Date: 2024-02-01 16:38:32.533228 + +""" +import os + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import Boolean, Column, String, orm, ForeignKey +from sqlalchemy.ext.declarative import declarative_base +from dataall.base.db import get_engine, has_table +from dataall.base.db import utils, Resource +from sqlalchemy.orm import query_expression +from sqlalchemy.dialects import postgresql + + +# revision identifiers, used by Alembic. +revision = 'a1049d854e29' +down_revision = '6c9a8afee4e4' +branch_labels = None +depends_on = None + +Base = declarative_base() + + +class Dataset(Resource, Base): + __tablename__ = 'dataset' + environmentUri = Column(String, ForeignKey("environment.environmentUri"), nullable=False) + organizationUri = Column(String, nullable=False) + datasetUri = Column(String, primary_key=True, default=utils.uuid('dataset')) + region = Column(String, default='eu-west-1') + AwsAccountId = Column(String, nullable=False) + S3BucketName = Column(String, nullable=False) + GlueDatabaseName = Column(String, nullable=False) + GlueCrawlerName = Column(String) + GlueCrawlerSchedule = Column(String) + GlueProfilingJobName = Column(String) + GlueProfilingTriggerSchedule = Column(String) + GlueProfilingTriggerName = Column(String) + GlueDataQualityJobName = Column(String) + GlueDataQualitySchedule = Column(String) + GlueDataQualityTriggerName = Column(String) + IAMDatasetAdminRoleArn = Column(String, nullable=False) + IAMDatasetAdminUserArn = Column(String, nullable=False) + KmsAlias = Column(String, nullable=False) + userRoleForDataset = query_expression() + userRoleInEnvironment = query_expression() + isPublishedInEnvironment = query_expression() + projectPermission = query_expression() + language = Column(String, nullable=False, default='English') + topics = Column(postgresql.ARRAY(String), nullable=True) + confidentiality = Column(String, nullable=False, default='Unclassified') + tags = Column(postgresql.ARRAY(String)) + inProject = query_expression() + + bucketCreated = Column(Boolean, default=False) + glueDatabaseCreated = Column(Boolean, default=False) + iamAdminRoleCreated = Column(Boolean, default=False) + iamAdminUserCreated = Column(Boolean, default=False) + kmsAliasCreated = Column(Boolean, default=False) + lakeformationLocationCreated = Column(Boolean, default=False) + bucketPolicyCreated = Column(Boolean, default=False) + + # bookmarked = Column(Integer, default=0) + # upvotes=Column(Integer, default=0) + + businessOwnerEmail = Column(String, nullable=True) + businessOwnerDelegationEmails = Column(postgresql.ARRAY(String), nullable=True) + stewards = Column(String, nullable=True) + + SamlAdminGroupName = Column(String, nullable=True) + + importedS3Bucket = Column(Boolean, default=False) + importedGlueDatabase = Column(Boolean, default=False) + importedKmsKey = Column(Boolean, default=False) + importedAdminRole = Column(Boolean, default=False) + imported = Column(Boolean, default=False) + + +class DatasetLock(Base): + __tablename__ = 'dataset_lock' + datasetUri = Column(String, nullable=False, primary_key=True) + isLocked = Column(Boolean, default=False) + acquiredBy = Column(String, nullable=True) + + @classmethod + def uri(cls): + return cls.datasetUri + + +def upgrade(): + """ + The script does the following migration: + 1) creation of the dataset_lock table + """ + try: + envname = os.getenv('envname', 'local') + print('ENVNAME', envname) + engine = get_engine(envname=envname).engine + + bind = op.get_bind() + session = orm.Session(bind=bind) + datasets: [Dataset] = session.query(Dataset).all() + + if not has_table('dataset_lock', engine): + print("Creating dataset_lock table") + + op.create_table( + 'dataset_lock', + sa.Column('datasetUri', sa.String(), primary_key=True), + sa.Column('isLocked', sa.Boolean(), nullable=False), + sa.Column('acquiredBy', sa.String(), nullable=True), + ) + + op.create_foreign_key( + 'fk_dataset_lock_datasetUri', # Constraint name + 'dataset_lock', 'dataset', + ['datasetUri'], ['datasetUri'] + ) + + print('Creating a new row for each existing dataset in dataset_lock table') + for dataset in datasets: + dataset_lock = DatasetLock( + datasetUri=dataset.datasetUri, + isLocked=False, + acquiredBy='' + ) + session.add(dataset_lock) + session.flush() # flush to get the datasetUri + + print("Creation of dataset_lock table is done") + + session.commit() + + except Exception as ex: + print(f"Failed to execute the migration script due to: {ex}") + raise ex + + +def downgrade(): + try: + bind = op.get_bind() + session = orm.Session(bind=bind) + + print("Dropping dataset_lock table") + + op.drop_table('dataset_lock') + + print("Dropping of dataset_lock table is done") + session.commit() + + except Exception as ex: + print(f"Failed to execute the migration script due to: {ex}") + raise ex