diff --git a/lakecli/iam/orm.py b/lakecli/iam/orm.py index 4519796..f9176f5 100644 --- a/lakecli/iam/orm.py +++ b/lakecli/iam/orm.py @@ -9,6 +9,40 @@ class Meta: database = database_proxy +class Schema(BaseModel): + class Meta: + table_name = 'schemata' + + id = AutoField() + schema_name = TextField() + location = TextField(null=True) + + +class Table(BaseModel): + class Meta: + table_name = 'tables' + + id = AutoField() + table_schema = TextField() + table_name = TextField() + create_time = TextField(null=True) + last_access_time = TextField(null=True) + + +class Column(BaseModel): + class Meta: + table_name = 'columns' + + id = AutoField() + table_schema = TextField() + table_name = TextField() + column_name = TextField() + data_type = TextField() + is_partition_column = BooleanField() + pii = TextField(null=True) + ordinal = IntegerField() + + class DatabasePrivilege(BaseModel): class Meta: table_name = 'database_privileges' @@ -36,7 +70,7 @@ def init(path): database = SqliteDatabase(os.path.expanduser(path)) database_proxy.initialize(database) database_proxy.connect() - database_proxy.create_tables([DatabasePrivilege, TablePrivilege]) + database_proxy.create_tables([DatabasePrivilege, TablePrivilege, Schema, Table, Column]) return database_proxy diff --git a/lakecli/iam/scanner.py b/lakecli/iam/scanner.py index 5789619..5d8e85c 100644 --- a/lakecli/iam/scanner.py +++ b/lakecli/iam/scanner.py @@ -2,8 +2,7 @@ import boto3 import logging -from lakecli.iam.orm import TablePrivilege, DatabasePrivilege, init, model_db_close - +from lakecli.iam.orm import TablePrivilege, DatabasePrivilege, init, model_db_close, Table, Database, Schema, Column LOGGER = logging.getLogger(__name__) @@ -16,15 +15,43 @@ def __init__(self, aws_config, path): LOGGER.info("Remove old sqlite database at %s" % self.path) os.remove(self.path) + @staticmethod + def parse_columns(column_list, ordinal_no, is_partition_col, schema_name, table_name): + LOGGER.debug(column_list) + LOGGER.info("%d columns found in %s.%s" % ( + len(column_list), schema_name, table_name)) + columns = [] + for c in column_list: + pii = c['Parameters']['PII'] if 'Parameters' in c and 'PII' in c['Parameters'] else None + columns.append(Column( + table_schema=schema_name, + table_name=table_name, + column_name=c['Name'], + data_type=c['Type'], + is_partition_column=is_partition_col, + pii=pii, + ordinal=ordinal_no + )) + ordinal_no += 1 + + return columns, ordinal_no + def scan(self): lake_client = boto3.client('lakeformation', region_name=self.aws_config.region, aws_access_key_id=self.aws_config.aws_access_key_id, aws_secret_access_key=self.aws_config.aws_secret_access_key) - table_permissions = lake_client.list_permissions( - ResourceType='TABLE', - ) + table_permissions = [] + next_token = '' + + while next_token is not None: + result = lake_client.list_permissions( + ResourceType='TABLE', + NextToken=next_token + ) + table_permissions += result['PrincipalResourcePermissions'] + next_token = result['NextToken'] if 'NextToken' in result else None LOGGER.debug(table_permissions) @@ -34,8 +61,8 @@ def scan(self): LOGGER.debug(database_permissions) table_privileges = [] - LOGGER.info("%d table permissions found." % len(table_permissions['PrincipalResourcePermissions'])) - for resource in table_permissions['PrincipalResourcePermissions']: + LOGGER.info("%d table permissions found." % len(table_permissions)) + for resource in table_permissions: for permission in resource['Permissions']: grant = permission in resource['PermissionsWithGrantOption'] table_privileges.append(self._new_privilege(permission, resource['Resource'], @@ -54,18 +81,70 @@ def scan(self): permission=permission, grant=grant )) + glue_client = boto3.client('glue', + region_name=self.aws_config.region, + aws_access_key_id=self.aws_config.aws_access_key_id, + aws_secret_access_key=self.aws_config.aws_secret_access_key) + + schemata = [] + tables = [] + columns = [] + + schemata_response = glue_client.get_databases() + LOGGER.debug(schemata_response) + LOGGER.info("%d databases found " % len(schemata_response['DatabaseList'])) + for d in schemata_response['DatabaseList']: + + schemata.append(Schema( + schema_name=d['Name'], + location=d['LocationUri'] if 'LocationUri' in d else None + )) + + tables_response = glue_client.get_tables( + DatabaseName=d['Name'] + ) + LOGGER.debug(tables_response) + LOGGER.info("%d tables found in %s" % (len(tables_response['TableList']), d['Name'])) + for t in tables_response['TableList']: + tables.append(Table( + table_schema=d['Name'], + table_name=t['Name'], + create_time=t['CreateTime'], + last_access_time=t['LastAccessTime'] if 'LastAccessTime' in t else None, + )) + + ordinal_no = 1 + if 'StorageDescriptor' in t: + for key in ('PartitionKeys', 'Columns'): + if key in t['StorageDescriptor']: + column_list = t['StorageDescriptor'][key] + parsed_columns, ordinal_no = Scanner.parse_columns(column_list, + ordinal_no, + key == 'PartitionKeys', + d['Name'], t['Name']) + columns.extend(parsed_columns) + connection = init(self.path) try: with connection.atomic(): TablePrivilege.bulk_create(table_privileges, batch_size=100) DatabasePrivilege.bulk_create(db_privileges, batch_size=100) + Schema.bulk_create(schemata, batch_size=100) + Table.bulk_create(tables, batch_size=100) + Column.bulk_create(columns, batch_size=100) + finally: model_db_close() @staticmethod def _principal(arn): # http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html + LOGGER.debug(arn) elements = arn.split(':', 5) + if len(elements) != 6: + LOGGER.warning("Unknown format for arn '%s'" % arn) + return arn + result = { 'arn': elements[0], 'partition': elements[1], diff --git a/test/test_scanner.py b/test/test_scanner.py index 60ddb98..4a6d0a4 100644 --- a/test/test_scanner.py +++ b/test/test_scanner.py @@ -1,4 +1,5 @@ from lakecli.iam.scanner import Scanner +from unittest import TestCase def test_table_permissions(): @@ -36,3 +37,36 @@ def test_prinicpal(): principal = Scanner._principal("arn:aws:iam::1111111:user/datalake_user") assert principal == "user/datalake_user" + + +class ScannerTests(TestCase): + +# database_list = {'DatabaseList': [{'Name': 'default', 'CreateTime': datetime.datetime(2019, 10, 27, 10, 39, 58, tzinfo=tzlocal()), 'CreateTableDefaultPermissions': [{'Principal': {'DataLakePrincipalIdentifier': 'IAM_ALLOWED_PRINCIPALS'}, 'Permissions': ['ALL']}]}, {'Name': 'sampledb', 'Description': 'Sample database', 'Parameters': {'CreatedBy': 'Athena', 'EXTERNAL': 'TRUE'}, 'CreateTime': datetime.datetime(2019, 10, 26, 10, 23, 10, tzinfo=tzlocal()), 'CreateTableDefaultPermissions': [{'Principal': {'DataLakePrincipalIdentifier': 'IAM_ALLOWED_PRINCIPALS'}, 'Permissions': ['ALL']}]}, {'Name': 'taxidata', 'CreateTime': datetime.datetime(2019, 11, 26, 12, 17, 49, tzinfo=tzlocal()), 'CreateTableDefaultPermissions': [{'Principal': {'DataLakePrincipalIdentifier': 'IAM_ALLOWED_PRINCIPALS'}, 'Permissions': ['ALL']}]}, {'Name': 'taxilake', 'LocationUri': 's3://tokerndev-datalake', 'CreateTime': datetime.datetime(2019, 11, 26, 21, 45, 50, tzinfo=tzlocal()), 'CreateTableDefaultPermissions': []}] +# table_list = {'TableList': [{'Name': 'csv_misc', 'DatabaseName': 'taxidata', 'Owner': 'owner', 'CreateTime': datetime.datetime(2019, 12, 9, 16, 12, 43, tzinfo=tzlocal()), 'UpdateTime': datetime.datetime(2019, 12, 9, 17, 33, 20, tzinfo=tzlocal()), 'LastAccessTime': datetime.datetime(2019, 12, 9, 16, 12, 43, tzinfo=tzlocal()), 'Retention': 0, 'StorageDescriptor': {'Columns': [{'Name': 'locationid', 'Type': 'bigint'}, {'Name': 'borough', 'Type': 'string', 'Parameters': {'PII': 'PiiTypes.ADDRESS'}}, {'Name': 'zone', 'Type': 'string', 'Parameters': {'PII': 'PiiTypes.ADDRESS'}}, {'Name': 'service_zone', 'Type': 'string', 'Parameters': {'PII': 'PiiTypes.ADDRESS'}}], 'Location': 's3://nyc-tlc/misc/', 'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat', 'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', 'Compressed': False, 'NumberOfBuckets': -1, 'SerdeInfo': {'SerializationLibrary': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe', 'Parameters': {'field.delim': ','}}, 'BucketColumns': [], 'SortColumns': [], 'Parameters': {'CrawlerSchemaDeserializerVersion': '1.0', 'CrawlerSchemaSerializerVersion': '1.0', 'UPDATED_BY_CRAWLER': 'TaxiCrawler', 'areColumnsQuoted': 'false', 'averageRecordSize': '36', 'classification': 'csv', 'columnsOrdered': 'true', 'compressionType': 'none', 'delimiter': ',', 'exclusions': '["s3://nyc-tlc/misc/*foil*","s3://nyc-tlc/misc/shared*","s3://nyc-tlc/misc/uber*","s3://nyc-tlc/misc/*.html","s3://nyc-tlc/misc/*.zip","s3://nyc-tlc/misc/FOIL_*"]', 'objectCount': '1', 'recordCount': '342', 'sizeKey': '12322', 'skip.header.line.count': '1', 'typeOfData': 'file'}, 'StoredAsSubDirectories': False}, 'PartitionKeys': [], 'TableType': 'EXTERNAL_TABLE', 'Parameters': {'CrawlerSchemaDeserializerVersion': '1.0', 'CrawlerSchemaSerializerVersion': '1.0', 'UPDATED_BY_CRAWLER': 'TaxiCrawler', 'areColumnsQuoted': 'false', 'averageRecordSize': '36', 'classification': 'csv', 'columnsOrdered': 'true', 'compressionType': 'none', 'delimiter': ',', 'exclusions': '["s3://nyc-tlc/misc/*foil*","s3://nyc-tlc/misc/shared*","s3://nyc-tlc/misc/uber*","s3://nyc-tlc/misc/*.html","s3://nyc-tlc/misc/*.zip","s3://nyc-tlc/misc/FOIL_*"]', 'objectCount': '1', 'recordCount': '342', 'sizeKey': '12322', 'skip.header.line.count': '1', 'typeOfData': 'file'}, 'IsRegisteredWithLakeFormation': False}, {'Name': 'csv_trip_data', 'DatabaseName': 'taxidata', 'Owner': 'owner', 'CreateTime': datetime.datetime(2019, 12, 9, 16, 12, 43, tzinfo=tzlocal()), 'UpdateTime': datetime.datetime(2019, 12, 9, 16, 12, 43, tzinfo=tzlocal()), 'LastAccessTime': datetime.datetime(2019, 12, 9, 16, 12, 43, tzinfo=tzlocal()), 'Retention': 0, 'StorageDescriptor': {'Columns': [{'Name': 'dispatching_base_num', 'Type': 'string'}, {'Name': 'pickup_datetime', 'Type': 'string'}, {'Name': 'dropoff_datetime', 'Type': 'string'}, {'Name': 'pulocationid', 'Type': 'bigint'}, {'Name': 'dolocationid', 'Type': 'bigint'}, {'Name': 'sr_flag', 'Type': 'bigint'}, {'Name': 'hvfhs_license_num', 'Type': 'string'}], 'Location': 's3://nyc-tlc/trip data/', 'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat', 'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', 'Compressed': False, 'NumberOfBuckets': -1, 'SerdeInfo': {'SerializationLibrary': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe', 'Parameters': {'field.delim': ','}}, 'BucketColumns': [], 'SortColumns': [], 'Parameters': {'CrawlerSchemaDeserializerVersion': '1.0', 'CrawlerSchemaSerializerVersion': '1.0', 'UPDATED_BY_CRAWLER': 'TaxiCrawler', 'areColumnsQuoted': 'false', 'averageRecordSize': '70', 'classification': 'csv', 'columnsOrdered': 'true', 'compressionType': 'none', 'delimiter': ',', 'exclusions': '["s3://nyc-tlc/trip data/fhv_tripdata_2015*","s3://nyc-tlc/trip data/fhv_tripdata_2016*","s3://nyc-tlc/trip data/fhv_tripdata_2017*","s3://nyc-tlc/trip data/fhv_tripdata_2018*","s3://nyc-tlc/trip data/yellow*","s3://nyc-tlc/trip data/green*"]', 'objectCount': '11', 'recordCount': '120729113', 'sizeKey': '8731786276', 'skip.header.line.count': '1', 'typeOfData': 'file'}, 'StoredAsSubDirectories': False}, 'PartitionKeys': [], 'TableType': 'EXTERNAL_TABLE', 'Parameters': {'CrawlerSchemaDeserializerVersion': '1.0', 'CrawlerSchemaSerializerVersion': '1.0', 'UPDATED_BY_CRAWLER': 'TaxiCrawler', 'areColumnsQuoted': 'false', 'averageRecordSize': '70', 'classification': 'csv', 'columnsOrdered': 'true', 'compressionType': 'none', 'delimiter': ',', 'exclusions': '["s3://nyc-tlc/trip data/fhv_tripdata_2015*","s3://nyc-tlc/trip data/fhv_tripdata_2016*","s3://nyc-tlc/trip data/fhv_tripdata_2017*","s3://nyc-tlc/trip data/fhv_tripdata_2018*","s3://nyc-tlc/trip data/yellow*","s3://nyc-tlc/trip data/green*"]', 'objectCount': '11', 'recordCount': '120729113', 'sizeKey': '8731786276', 'skip.header.line.count': '1', 'typeOfData': 'file'}, 'CreatedBy': 'arn:aws:sts::172965158661:assumed-role/LakeFormationWorkflowRole/AWS-Crawler', 'IsRegisteredWithLakeFormation': False}] + + def test_parse_columns(self): + column_list = [ + {'Name': 'locationid', 'Type': 'bigint'}, + {'Name': 'borough', 'Type': 'string', 'Parameters': {'PII': 'PiiTypes.ADDRESS'}}, + {'Name': 'zone', 'Type': 'string', 'Parameters': {'PII': 'PiiTypes.ADDRESS'}}, + {'Name': 'service_zone', 'Type': 'string', 'Parameters': {'PII': 'PiiTypes.ADDRESS'}} + ] + + ordinal_number = 1 + parsed_columns, ordinal_number = Scanner.parse_columns(column_list, ordinal_number, False, 'schema', 'table') + + self.assertEqual(5, ordinal_number) + + self.assertEqual("schema", parsed_columns[0].table_schema) + self.assertEqual("table", parsed_columns[0].table_name) + self.assertEqual(1, parsed_columns[0].ordinal) + self.assertEqual("locationid", parsed_columns[0].column_name) + self.assertEqual("bigint", parsed_columns[0].data_type) + self.assertIsNone(parsed_columns[0].pii) + + self.assertEqual("schema", parsed_columns[1].table_schema) + self.assertEqual("table", parsed_columns[1].table_name) + self.assertEqual(2, parsed_columns[1].ordinal) + self.assertEqual("borough", parsed_columns[1].column_name) + self.assertEqual("string", parsed_columns[1].data_type) + self.assertEqual("PiiTypes.ADDRESS", parsed_columns[1].pii)