Skip to content

Commit

Permalink
827 share verify and reapply fix (data-dot-all#1062)
Browse files Browse the repository at this point in the history
### Feature or Bugfix
<!-- please choose -->
- Feature


### Detail
- Add Ability to Verify Health Status of Share Requests
- Add Ability to Re-apply share request processing if in an unhealthy
state

### Relates
- data-dot-all#827 

### Security
Please answer the questions below briefly where applicable, or write
`N/A`. Based on
[OWASP 10](https://owasp.org/Top10/en/).

- Does this PR introduce or modify any input fields or queries - this
includes
fetching data from storage outside the application (e.g. a database, an
S3 bucket)?
  - Is the input sanitized?
- What precautions are you taking before deserializing the data you
consume?
  - Is injection prevented by parametrizing queries?
  - Have you ensured no `eval` or similar functions are used?
- Does this PR introduce any functionality or component that requires
authorization?
- How have you ensured it respects the existing AuthN/AuthZ mechanisms?
  - Are you logging failed auth attempts?
- Are you using or adding any cryptographic features?
  - Do you use a standard proven implementations?
  - Are the used keys controlled by the customer? Where are they stored?
- Are you introducing any new policies/roles/users?
  - Have you used the least-privilege principle? How?




### Pending TODO

- [x] ECS Scheduled Task to Verify Shares
- [x] Write Unit Tests
- [x] Add Verify On Dataset Level for All Shared Items in Associated
Share Objects


### Things to Consider

- ECS Scheduled Task to be Weekly (default)
- Open to discussion on if weekly is the best default schedule or if a
different time range is better
- Alternatively, can introducer a custom scheduler parameter in
`config.json`

- Currently shareitem health columns (`healthStatus`, `healthMessage`,
`lastVerifiedTimestamp`) are a part of the same `share_object_item`
table - makes it very easy to get the latest health of each share item
- Take a look at the comment on [Issue
827](data-dot-all#827 (comment))
for more information about the lifecycle of these health columns / how
they are updated
- Alternatively, could create a new `share_object_item_health` table
that tracks every verify attempt and a column in `share_object_item` to
point to the primary key of the record in `share_object_item_health`
with the latest health information (updated for each verification run) -
would persist each verification run for auditability / traceability





By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.

---------

Co-authored-by: dlpzx <[email protected]>
Co-authored-by: Anushka Singh <[email protected]>
Co-authored-by: mourya-33 <[email protected]>
Co-authored-by: Tejas Rajopadhye <[email protected]>
Co-authored-by: trajopadhye <[email protected]>
Co-authored-by: Zilvinas Saltys <[email protected]>
Co-authored-by: Mourya Darivemula <[email protected]>
Co-authored-by: Sofia Sazonova <[email protected]>
Co-authored-by: Sofia Sazonova <[email protected]>
Co-authored-by: Balint David <[email protected]>
Co-authored-by: Petros Kalos <[email protected]>
  • Loading branch information
12 people authored Feb 29, 2024
1 parent 7aef3ad commit 45943a9
Show file tree
Hide file tree
Showing 47 changed files with 4,062 additions and 1,899 deletions.
7 changes: 4 additions & 3 deletions backend/dataall/modules/dataset_sharing/api/input_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
)


RevokeItemsInput = gql.InputType(
name='RevokeItemsInput',
ShareItemSelectorInput = gql.InputType(
name='ShareItemSelectorInput',
arguments=[
gql.Argument(name='shareUri', type=gql.NonNullableType(gql.String)),
gql.Argument(name='revokedItemUris', type=gql.NonNullableType(gql.ArrayType(gql.String))),
gql.Argument(name='itemUris', type=gql.NonNullableType(gql.ArrayType(gql.String))),
],
)

Expand Down Expand Up @@ -69,6 +69,7 @@
gql.Argument('tags', gql.ArrayType(gql.String)),
gql.Argument(name='isShared', type=gql.Boolean),
gql.Argument(name='isRevokable', type=gql.Boolean),
gql.Argument(name='isHealthy', type=gql.Boolean),
gql.Argument('page', gql.Integer),
gql.Argument('pageSize', gql.Integer),
],
Expand Down
16 changes: 15 additions & 1 deletion backend/dataall/modules/dataset_sharing/api/mutations.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,25 @@

revokeItemsShareObject = gql.MutationField(
name='revokeItemsShareObject',
args=[gql.Argument(name='input', type=gql.Ref('RevokeItemsInput'))],
args=[gql.Argument(name='input', type=gql.Ref('ShareItemSelectorInput'))],
type=gql.Ref('ShareObject'),
resolver=revoke_items_share_object,
)

verifyItemsShareObject = gql.MutationField(
name='verifyItemsShareObject',
args=[gql.Argument(name='input', type=gql.Ref('ShareItemSelectorInput'))],
type=gql.Ref('ShareObject'),
resolver=verify_items_share_object,
)

reApplyItemsShareObject = gql.MutationField(
name='reApplyItemsShareObject',
args=[gql.Argument(name='input', type=gql.Ref('ShareItemSelectorInput'))],
type=gql.Ref('ShareObject'),
resolver=reapply_items_share_object,
)

updateShareRejectReason = gql.MutationField(
name='updateShareRejectReason',
args=[
Expand Down
48 changes: 39 additions & 9 deletions backend/dataall/modules/dataset_sharing/api/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,28 @@
log = logging.getLogger(__name__)


class RequestValidator:
@staticmethod
def validate_creation_request(data):
if not data:
raise RequiredParameter(data)
if not data.get('principalId'):
raise RequiredParameter('principalId')
if not data.get('principalType'):
raise RequiredParameter('principalType')
if not data.get('groupUri'):
raise RequiredParameter('groupUri')

@staticmethod
def validate_item_selector_input(data):
if not data:
raise RequiredParameter(data)
if not data.get('shareUri'):
raise RequiredParameter('shareUri')
if not data.get('itemUris'):
raise RequiredParameter('itemUris')


def create_share_object(
context: Context,
source,
Expand All @@ -24,14 +46,7 @@ def create_share_object(
itemType: str = None,
input: dict = None,
):
if not input:
raise RequiredParameter(input)
if 'principalId' not in input:
raise RequiredParameter('principalId')
if 'principalType' not in input:
raise RequiredParameter('principalType')
if 'groupUri' not in input:
raise RequiredParameter('groupUri')
RequestValidator.validate_creation_request(input)

return ShareObjectService.create_share_object(
uri=input['environmentUri'],
Expand All @@ -58,11 +73,26 @@ def reject_share_object(context: Context, source, shareUri: str = None, rejectPu


def revoke_items_share_object(context: Context, source, input):
RequestValidator.validate_item_selector_input(input)
share_uri = input.get("shareUri")
revoked_uris = input.get("revokedItemUris")
revoked_uris = input.get("itemUris")
return ShareItemService.revoke_items_share_object(uri=share_uri, revoked_uris=revoked_uris)


def verify_items_share_object(context: Context, source, input):
RequestValidator.validate_item_selector_input(input)
share_uri = input.get("shareUri")
verify_item_uris = input.get("itemUris")
return ShareItemService.verify_items_share_object(uri=share_uri, item_uris=verify_item_uris)


def reapply_items_share_object(context: Context, source, input):
RequestValidator.validate_item_selector_input(input)
share_uri = input.get("shareUri")
reapply_item_uris = input.get("itemUris")
return ShareItemService.reapply_items_share_object(uri=share_uri, item_uris=reapply_item_uris)


def delete_share_object(context: Context, source, shareUri: str = None):
return ShareObjectService.delete_share_object(uri=shareUri)

Expand Down
5 changes: 4 additions & 1 deletion backend/dataall/modules/dataset_sharing/api/types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataall.base.api import gql
from dataall.modules.dataset_sharing.services.dataset_sharing_enums import ShareableType, PrincipalType
from dataall.modules.dataset_sharing.services.dataset_sharing_enums import ShareableType, PrincipalType, ShareItemHealthStatus
from dataall.modules.dataset_sharing.api.resolvers import union_resolver, resolve_shared_item, resolve_dataset, \
resolve_consumption_data, resolve_existing_shared_items, resolve_share_object_statistics, resolve_principal, \
resolve_group, list_shareable_objects, resolve_user_role, resolve_shared_database_name
Expand All @@ -23,6 +23,9 @@
gql.Field('itemType', ShareableType.toGraphQLEnum()),
gql.Field('itemName', gql.String),
gql.Field('description', gql.String),
gql.Field('healthStatus', ShareItemHealthStatus.toGraphQLEnum()),
gql.Field('healthMessage', gql.String),
gql.Field('lastVerificationTime', gql.String),
gql.Field(
name='sharedObject',
type=gql.Ref('ShareableObject'),
Expand Down
2 changes: 1 addition & 1 deletion backend/dataall/modules/dataset_sharing/aws/glue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def table_exists(self, table_name):
CatalogId=self._account_id, DatabaseName=self._database, Name=table_name
)
)
log.info(f'Glue table {table_name} not found in account {self._account_id} in database {self._database}')
log.info(f'Glue table {table_name} found in account {self._account_id} in database {self._database}')
return table
except ClientError:
log.info(f'Glue table not found: {table_name}')
Expand Down
169 changes: 144 additions & 25 deletions backend/dataall/modules/dataset_sharing/aws/lakeformation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def grant_permissions_to_table(
permissions,
permissions_with_grant_options=None,
) -> True:

resource = {
'Table': {
'DatabaseName': database_name,
Expand Down Expand Up @@ -99,34 +100,18 @@ def _grant_permissions_to_resource(
) -> True:
for principal in principals:
try:
log.info(
f'Granting principal {principal} '
f'permissions {permissions} '
f'to {str(resource)}...'
)
check_dict = dict(
Principal={'DataLakePrincipalIdentifier': principal},
Resource=check_resource if check_resource else resource
)
existing = self._client.list_permissions(**check_dict)
current = []
current_grant = []
for permission in existing['PrincipalResourcePermissions']:
current.extend(permission["Permissions"])
current_grant.extend(permission["PermissionsWithGrantOption"])

missing_permissions = list(set(permissions) - set(current))
missing_grant_permissions = list(set(permissions_with_grant_options) - set(current_grant)) if permissions_with_grant_options else []

if not missing_permissions and not missing_grant_permissions:
if not self._check_permissions_to_resource(
principal=principal,
resource=resource,
permissions=permissions,
permissions_with_grant_options=permissions_with_grant_options,
check_resource=check_resource
):
log.info(
f'Already granted principal {principal} '
f'Granting principal {principal} '
f'permissions {permissions} '
f'and permissions with grant options {permissions_with_grant_options} '
f'to {str(resource)} '
f'response: {existing}'
f'to {str(resource)}...'
)
else:
# We define the grant with "permissions" instead of "missing_permissions" because we want to avoid
# duplicates done by data.all, but we want to avoid dependencies with external grants
grant_dict = dict(
Expand All @@ -149,6 +134,7 @@ def _grant_permissions_to_resource(
f'response: {response}'
)
time.sleep(2)

except ClientError as e:
log.error(
f'Could not grant principal {principal} '
Expand Down Expand Up @@ -285,3 +271,136 @@ def _revoke_permissions_from_resource(
f'response error: {error}'
)
return True

def check_permissions_to_database(
self,
principals,
database_name,
permissions,
) -> True:
resource = {
'Database': {'Name': database_name},
}
check = []
for principal in principals:
check.append(
self._check_permissions_to_resource(
principal=principal,
resource=resource,
permissions=permissions
)
)
return all(check)

def check_permissions_to_table(
self,
principals,
database_name,
table_name,
catalog_id,
permissions,
permissions_with_grant_options=None,
) -> True:
resource = {
'Table': {
'DatabaseName': database_name,
'Name': table_name,
'CatalogId': catalog_id,
}
}
check = []
for principal in principals:
check.append(
self._check_permissions_to_resource(
principal=principal,
resource=resource,
permissions=permissions,
permissions_with_grant_options=permissions_with_grant_options
)
)
return all(check)

def check_permissions_to_table_with_columns(
self,
principals,
database_name,
table_name,
catalog_id,
permissions,
permissions_with_grant_options=None,
) -> True:
resource = {
'TableWithColumns': {
'DatabaseName': database_name,
'Name': table_name,
'ColumnWildcard': {},
'CatalogId': catalog_id,
}
}
check_resource = {
'Table': {
'DatabaseName': database_name,
'Name': table_name,
'CatalogId': catalog_id,
}
}
check = []
for principal in principals:
check.append(
self._check_permissions_to_resource(
principal=principal,
resource=resource,
permissions=permissions,
permissions_with_grant_options=permissions_with_grant_options,
check_resource=check_resource
)
)
return all(check)

def _check_permissions_to_resource(
self,
principal: str,
resource: dict,
permissions: List,
permissions_with_grant_options: List = None,
check_resource: dict = None
) -> bool:
try:
log.info(
f'Checking principal {principal} '
f'permissions {permissions} '
f'to {str(resource)}...'
)
check_dict = dict(
Principal={'DataLakePrincipalIdentifier': principal},
Resource=check_resource if check_resource else resource
)
existing = self._client.list_permissions(**check_dict)
current = []
current_grant = []
for permission in existing['PrincipalResourcePermissions']:
current.extend(permission["Permissions"])
current_grant.extend(permission["PermissionsWithGrantOption"])

missing_permissions = list(set(permissions) - set(current))
missing_grant_permissions = list(set(permissions_with_grant_options) - set(current_grant)) if permissions_with_grant_options else []

if missing_permissions or missing_grant_permissions:
return False
else:
log.info(
f'Already granted principal {principal} '
f'permissions {permissions} '
f'and permissions with grant options {permissions_with_grant_options} '
f'to {str(resource)} '
f'response: {existing}'
)
return True
except ClientError as e:
log.error(
f'Could not list principal {principal} '
f'permissions {permissions} '
f'to {str(resource)} '
f'due to: {e}'
)
raise e
Loading

0 comments on commit 45943a9

Please sign in to comment.