Skip to content

Commit

Permalink
new:usr:Add schemas, tables and columns in information schema (#12)
Browse files Browse the repository at this point in the history
Information schema also processes PII in column parameters.
  • Loading branch information
vrajat authored Dec 10, 2019
1 parent 2a5d103 commit 70d7185
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 8 deletions.
36 changes: 35 additions & 1 deletion lakecli/iam/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,40 @@ class Meta:
database = database_proxy


class Schema(BaseModel):
class Meta:
table_name = 'schemata'

id = AutoField()
schema_name = TextField()
location = TextField(null=True)


class Table(BaseModel):
class Meta:
table_name = 'tables'

id = AutoField()
table_schema = TextField()
table_name = TextField()
create_time = TextField(null=True)
last_access_time = TextField(null=True)


class Column(BaseModel):
class Meta:
table_name = 'columns'

id = AutoField()
table_schema = TextField()
table_name = TextField()
column_name = TextField()
data_type = TextField()
is_partition_column = BooleanField()
pii = TextField(null=True)
ordinal = IntegerField()


class DatabasePrivilege(BaseModel):
class Meta:
table_name = 'database_privileges'
Expand Down Expand Up @@ -36,7 +70,7 @@ def init(path):
database = SqliteDatabase(os.path.expanduser(path))
database_proxy.initialize(database)
database_proxy.connect()
database_proxy.create_tables([DatabasePrivilege, TablePrivilege])
database_proxy.create_tables([DatabasePrivilege, TablePrivilege, Schema, Table, Column])

return database_proxy

Expand Down
93 changes: 86 additions & 7 deletions lakecli/iam/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
import boto3
import logging

from lakecli.iam.orm import TablePrivilege, DatabasePrivilege, init, model_db_close

from lakecli.iam.orm import TablePrivilege, DatabasePrivilege, init, model_db_close, Table, Database, Schema, Column

LOGGER = logging.getLogger(__name__)

Expand All @@ -16,15 +15,43 @@ def __init__(self, aws_config, path):
LOGGER.info("Remove old sqlite database at %s" % self.path)
os.remove(self.path)

@staticmethod
def parse_columns(column_list, ordinal_no, is_partition_col, schema_name, table_name):
LOGGER.debug(column_list)
LOGGER.info("%d columns found in %s.%s" % (
len(column_list), schema_name, table_name))
columns = []
for c in column_list:
pii = c['Parameters']['PII'] if 'Parameters' in c and 'PII' in c['Parameters'] else None
columns.append(Column(
table_schema=schema_name,
table_name=table_name,
column_name=c['Name'],
data_type=c['Type'],
is_partition_column=is_partition_col,
pii=pii,
ordinal=ordinal_no
))
ordinal_no += 1

return columns, ordinal_no

def scan(self):
lake_client = boto3.client('lakeformation',
region_name=self.aws_config.region,
aws_access_key_id=self.aws_config.aws_access_key_id,
aws_secret_access_key=self.aws_config.aws_secret_access_key)

table_permissions = lake_client.list_permissions(
ResourceType='TABLE',
)
table_permissions = []
next_token = ''

while next_token is not None:
result = lake_client.list_permissions(
ResourceType='TABLE',
NextToken=next_token
)
table_permissions += result['PrincipalResourcePermissions']
next_token = result['NextToken'] if 'NextToken' in result else None

LOGGER.debug(table_permissions)

Expand All @@ -34,8 +61,8 @@ def scan(self):
LOGGER.debug(database_permissions)

table_privileges = []
LOGGER.info("%d table permissions found." % len(table_permissions['PrincipalResourcePermissions']))
for resource in table_permissions['PrincipalResourcePermissions']:
LOGGER.info("%d table permissions found." % len(table_permissions))
for resource in table_permissions:
for permission in resource['Permissions']:
grant = permission in resource['PermissionsWithGrantOption']
table_privileges.append(self._new_privilege(permission, resource['Resource'],
Expand All @@ -54,18 +81,70 @@ def scan(self):
permission=permission, grant=grant
))

glue_client = boto3.client('glue',
region_name=self.aws_config.region,
aws_access_key_id=self.aws_config.aws_access_key_id,
aws_secret_access_key=self.aws_config.aws_secret_access_key)

schemata = []
tables = []
columns = []

schemata_response = glue_client.get_databases()
LOGGER.debug(schemata_response)
LOGGER.info("%d databases found " % len(schemata_response['DatabaseList']))
for d in schemata_response['DatabaseList']:

schemata.append(Schema(
schema_name=d['Name'],
location=d['LocationUri'] if 'LocationUri' in d else None
))

tables_response = glue_client.get_tables(
DatabaseName=d['Name']
)
LOGGER.debug(tables_response)
LOGGER.info("%d tables found in %s" % (len(tables_response['TableList']), d['Name']))
for t in tables_response['TableList']:
tables.append(Table(
table_schema=d['Name'],
table_name=t['Name'],
create_time=t['CreateTime'],
last_access_time=t['LastAccessTime'] if 'LastAccessTime' in t else None,
))

ordinal_no = 1
if 'StorageDescriptor' in t:
for key in ('PartitionKeys', 'Columns'):
if key in t['StorageDescriptor']:
column_list = t['StorageDescriptor'][key]
parsed_columns, ordinal_no = Scanner.parse_columns(column_list,
ordinal_no,
key == 'PartitionKeys',
d['Name'], t['Name'])
columns.extend(parsed_columns)

connection = init(self.path)
try:
with connection.atomic():
TablePrivilege.bulk_create(table_privileges, batch_size=100)
DatabasePrivilege.bulk_create(db_privileges, batch_size=100)
Schema.bulk_create(schemata, batch_size=100)
Table.bulk_create(tables, batch_size=100)
Column.bulk_create(columns, batch_size=100)

finally:
model_db_close()

@staticmethod
def _principal(arn):
# http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html
LOGGER.debug(arn)
elements = arn.split(':', 5)
if len(elements) != 6:
LOGGER.warning("Unknown format for arn '%s'" % arn)
return arn

result = {
'arn': elements[0],
'partition': elements[1],
Expand Down
34 changes: 34 additions & 0 deletions test/test_scanner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from lakecli.iam.scanner import Scanner
from unittest import TestCase


def test_table_permissions():
Expand Down Expand Up @@ -36,3 +37,36 @@ def test_prinicpal():
principal = Scanner._principal("arn:aws:iam::1111111:user/datalake_user")

assert principal == "user/datalake_user"


class ScannerTests(TestCase):

# database_list = {'DatabaseList': [{'Name': 'default', 'CreateTime': datetime.datetime(2019, 10, 27, 10, 39, 58, tzinfo=tzlocal()), 'CreateTableDefaultPermissions': [{'Principal': {'DataLakePrincipalIdentifier': 'IAM_ALLOWED_PRINCIPALS'}, 'Permissions': ['ALL']}]}, {'Name': 'sampledb', 'Description': 'Sample database', 'Parameters': {'CreatedBy': 'Athena', 'EXTERNAL': 'TRUE'}, 'CreateTime': datetime.datetime(2019, 10, 26, 10, 23, 10, tzinfo=tzlocal()), 'CreateTableDefaultPermissions': [{'Principal': {'DataLakePrincipalIdentifier': 'IAM_ALLOWED_PRINCIPALS'}, 'Permissions': ['ALL']}]}, {'Name': 'taxidata', 'CreateTime': datetime.datetime(2019, 11, 26, 12, 17, 49, tzinfo=tzlocal()), 'CreateTableDefaultPermissions': [{'Principal': {'DataLakePrincipalIdentifier': 'IAM_ALLOWED_PRINCIPALS'}, 'Permissions': ['ALL']}]}, {'Name': 'taxilake', 'LocationUri': 's3://tokerndev-datalake', 'CreateTime': datetime.datetime(2019, 11, 26, 21, 45, 50, tzinfo=tzlocal()), 'CreateTableDefaultPermissions': []}]
# table_list = {'TableList': [{'Name': 'csv_misc', 'DatabaseName': 'taxidata', 'Owner': 'owner', 'CreateTime': datetime.datetime(2019, 12, 9, 16, 12, 43, tzinfo=tzlocal()), 'UpdateTime': datetime.datetime(2019, 12, 9, 17, 33, 20, tzinfo=tzlocal()), 'LastAccessTime': datetime.datetime(2019, 12, 9, 16, 12, 43, tzinfo=tzlocal()), 'Retention': 0, 'StorageDescriptor': {'Columns': [{'Name': 'locationid', 'Type': 'bigint'}, {'Name': 'borough', 'Type': 'string', 'Parameters': {'PII': 'PiiTypes.ADDRESS'}}, {'Name': 'zone', 'Type': 'string', 'Parameters': {'PII': 'PiiTypes.ADDRESS'}}, {'Name': 'service_zone', 'Type': 'string', 'Parameters': {'PII': 'PiiTypes.ADDRESS'}}], 'Location': 's3://nyc-tlc/misc/', 'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat', 'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', 'Compressed': False, 'NumberOfBuckets': -1, 'SerdeInfo': {'SerializationLibrary': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe', 'Parameters': {'field.delim': ','}}, 'BucketColumns': [], 'SortColumns': [], 'Parameters': {'CrawlerSchemaDeserializerVersion': '1.0', 'CrawlerSchemaSerializerVersion': '1.0', 'UPDATED_BY_CRAWLER': 'TaxiCrawler', 'areColumnsQuoted': 'false', 'averageRecordSize': '36', 'classification': 'csv', 'columnsOrdered': 'true', 'compressionType': 'none', 'delimiter': ',', 'exclusions': '["s3://nyc-tlc/misc/*foil*","s3://nyc-tlc/misc/shared*","s3://nyc-tlc/misc/uber*","s3://nyc-tlc/misc/*.html","s3://nyc-tlc/misc/*.zip","s3://nyc-tlc/misc/FOIL_*"]', 'objectCount': '1', 'recordCount': '342', 'sizeKey': '12322', 'skip.header.line.count': '1', 'typeOfData': 'file'}, 'StoredAsSubDirectories': False}, 'PartitionKeys': [], 'TableType': 'EXTERNAL_TABLE', 'Parameters': {'CrawlerSchemaDeserializerVersion': '1.0', 'CrawlerSchemaSerializerVersion': '1.0', 'UPDATED_BY_CRAWLER': 'TaxiCrawler', 'areColumnsQuoted': 'false', 'averageRecordSize': '36', 'classification': 'csv', 'columnsOrdered': 'true', 'compressionType': 'none', 'delimiter': ',', 'exclusions': '["s3://nyc-tlc/misc/*foil*","s3://nyc-tlc/misc/shared*","s3://nyc-tlc/misc/uber*","s3://nyc-tlc/misc/*.html","s3://nyc-tlc/misc/*.zip","s3://nyc-tlc/misc/FOIL_*"]', 'objectCount': '1', 'recordCount': '342', 'sizeKey': '12322', 'skip.header.line.count': '1', 'typeOfData': 'file'}, 'IsRegisteredWithLakeFormation': False}, {'Name': 'csv_trip_data', 'DatabaseName': 'taxidata', 'Owner': 'owner', 'CreateTime': datetime.datetime(2019, 12, 9, 16, 12, 43, tzinfo=tzlocal()), 'UpdateTime': datetime.datetime(2019, 12, 9, 16, 12, 43, tzinfo=tzlocal()), 'LastAccessTime': datetime.datetime(2019, 12, 9, 16, 12, 43, tzinfo=tzlocal()), 'Retention': 0, 'StorageDescriptor': {'Columns': [{'Name': 'dispatching_base_num', 'Type': 'string'}, {'Name': 'pickup_datetime', 'Type': 'string'}, {'Name': 'dropoff_datetime', 'Type': 'string'}, {'Name': 'pulocationid', 'Type': 'bigint'}, {'Name': 'dolocationid', 'Type': 'bigint'}, {'Name': 'sr_flag', 'Type': 'bigint'}, {'Name': 'hvfhs_license_num', 'Type': 'string'}], 'Location': 's3://nyc-tlc/trip data/', 'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat', 'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', 'Compressed': False, 'NumberOfBuckets': -1, 'SerdeInfo': {'SerializationLibrary': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe', 'Parameters': {'field.delim': ','}}, 'BucketColumns': [], 'SortColumns': [], 'Parameters': {'CrawlerSchemaDeserializerVersion': '1.0', 'CrawlerSchemaSerializerVersion': '1.0', 'UPDATED_BY_CRAWLER': 'TaxiCrawler', 'areColumnsQuoted': 'false', 'averageRecordSize': '70', 'classification': 'csv', 'columnsOrdered': 'true', 'compressionType': 'none', 'delimiter': ',', 'exclusions': '["s3://nyc-tlc/trip data/fhv_tripdata_2015*","s3://nyc-tlc/trip data/fhv_tripdata_2016*","s3://nyc-tlc/trip data/fhv_tripdata_2017*","s3://nyc-tlc/trip data/fhv_tripdata_2018*","s3://nyc-tlc/trip data/yellow*","s3://nyc-tlc/trip data/green*"]', 'objectCount': '11', 'recordCount': '120729113', 'sizeKey': '8731786276', 'skip.header.line.count': '1', 'typeOfData': 'file'}, 'StoredAsSubDirectories': False}, 'PartitionKeys': [], 'TableType': 'EXTERNAL_TABLE', 'Parameters': {'CrawlerSchemaDeserializerVersion': '1.0', 'CrawlerSchemaSerializerVersion': '1.0', 'UPDATED_BY_CRAWLER': 'TaxiCrawler', 'areColumnsQuoted': 'false', 'averageRecordSize': '70', 'classification': 'csv', 'columnsOrdered': 'true', 'compressionType': 'none', 'delimiter': ',', 'exclusions': '["s3://nyc-tlc/trip data/fhv_tripdata_2015*","s3://nyc-tlc/trip data/fhv_tripdata_2016*","s3://nyc-tlc/trip data/fhv_tripdata_2017*","s3://nyc-tlc/trip data/fhv_tripdata_2018*","s3://nyc-tlc/trip data/yellow*","s3://nyc-tlc/trip data/green*"]', 'objectCount': '11', 'recordCount': '120729113', 'sizeKey': '8731786276', 'skip.header.line.count': '1', 'typeOfData': 'file'}, 'CreatedBy': 'arn:aws:sts::172965158661:assumed-role/LakeFormationWorkflowRole/AWS-Crawler', 'IsRegisteredWithLakeFormation': False}]

def test_parse_columns(self):
column_list = [
{'Name': 'locationid', 'Type': 'bigint'},
{'Name': 'borough', 'Type': 'string', 'Parameters': {'PII': 'PiiTypes.ADDRESS'}},
{'Name': 'zone', 'Type': 'string', 'Parameters': {'PII': 'PiiTypes.ADDRESS'}},
{'Name': 'service_zone', 'Type': 'string', 'Parameters': {'PII': 'PiiTypes.ADDRESS'}}
]

ordinal_number = 1
parsed_columns, ordinal_number = Scanner.parse_columns(column_list, ordinal_number, False, 'schema', 'table')

self.assertEqual(5, ordinal_number)

self.assertEqual("schema", parsed_columns[0].table_schema)
self.assertEqual("table", parsed_columns[0].table_name)
self.assertEqual(1, parsed_columns[0].ordinal)
self.assertEqual("locationid", parsed_columns[0].column_name)
self.assertEqual("bigint", parsed_columns[0].data_type)
self.assertIsNone(parsed_columns[0].pii)

self.assertEqual("schema", parsed_columns[1].table_schema)
self.assertEqual("table", parsed_columns[1].table_name)
self.assertEqual(2, parsed_columns[1].ordinal)
self.assertEqual("borough", parsed_columns[1].column_name)
self.assertEqual("string", parsed_columns[1].data_type)
self.assertEqual("PiiTypes.ADDRESS", parsed_columns[1].pii)

0 comments on commit 70d7185

Please sign in to comment.