From 539a56675f699f07e493ca78326953cb7a25cdc5 Mon Sep 17 00:00:00 2001 From: VISHAL KUMAR <110387730+vishalkSimplify@users.noreply.github.com> Date: Wed, 28 Jun 2023 22:47:06 +0530 Subject: [PATCH] added cache funtionality for functions (#11) * testing with version 1.4.0 * testing with version 1.3.24 * testing with version 1.3.24 * test again * changed sqlalchemy version * updated readme and change version in requrements.txt file * checking 2.0 version * wdefrgtb * reverted back to sqlalchemy 1.4.40 version * bumped supported version to 1.4.48 * changed version to 1.4 * updated readme.rst * fixed projection comments * reconfigured functions based on datahub requirements. * reconfigured functions based on datahub requirements. * reconfigured functions based on datahub requirements. * reconfigured functions based on datahub requirements. * reconfigured functions based on datahub requirements. * reconfigured functions based on datahub requirements. * reconfigured functions based on datahub requirements. * reconfigured functions based on datahub requirements. * reconfigured functions based on datahub requirements. * reconfigured functions based on datahub requirements. * reconfigured functions based on datahub requirements. * reconfigured functions based on datahub requirements. * reconfigured functions based on datahub requirements. * reconfigured functions based on datahub requirements. * added cache functionality and fixed integration tests * added cache functionality and fixed integration tests * added cache functionality and fixed integration tests * added cache functionality and fixed integration tests * added cache functionality and fixed integration tests * added cache functionality and fixed integration tests * added cache functionality and fixed integration tests * added cache functionality and fixed integration tests * added cache functionality and fixed integration tests * added cache functionality and fixed integration tests * Update base.py * Update test_integration.py * fixed view and projection lineage --- .github/workflows/dialecttest.yml | 2 +- README.md | 2 +- README.rst | 2 +- requirements.txt | 2 +- setup.py | 2 +- test/sample_objects.py | 95 +- test/test_integration.py | 125 +- vertica_sqlalchemy_dialect/base.py | 1742 +++++++++++++++++----------- 8 files changed, 1210 insertions(+), 762 deletions(-) diff --git a/.github/workflows/dialecttest.yml b/.github/workflows/dialecttest.yml index 0734f46..ccd77b8 100644 --- a/.github/workflows/dialecttest.yml +++ b/.github/workflows/dialecttest.yml @@ -57,7 +57,7 @@ jobs: python -m ensurepip --upgrade python -m venv venv source venv/bin/activate - python -m pip install setuptools wheel pytest pyodbc sqlalchemy==1.4 + python -m pip install setuptools wheel pytest pyodbc sqlalchemy==1.4.44 python setup.py install - name: Run tests # This step references the directory that contains the action. diff --git a/README.md b/README.md index 867c50e..3de6219 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ You will need the following softwares to run, build and test the dialect. Everyt 1. Python 3.x or higher 2. pip 22 or higher -3. sqlalchemy>=1.3.24,<=1.4 +3. sqlalchemy>=1.3.24,<=1.4.44 4. vertica-python 1.1.1 or higher ### Vertica-Python diff --git a/README.rst b/README.rst index 1a7703b..2b7677b 100644 --- a/README.rst +++ b/README.rst @@ -16,7 +16,7 @@ You will need the following softwares to run, build and test the dialect. Everyt 1. Python 3.x or higher 2. pip 22 or higher -3. sqlalchemy>=1.3.24,<=1.4 +3. sqlalchemy>=1.3.24,<=1.4.44 4. vertica-python 1.1.1 or higher ##################################### diff --git a/requirements.txt b/requirements.txt index 751909f..1052ecb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ six >= 1.10.0 -SQLAlchemy==1.4 +SQLAlchemy==1.4.44 vertica-python>=1.1.1 pyodbc>=4.0.16 \ No newline at end of file diff --git a/setup.py b/setup.py index a8259e4..cadd419 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ ), install_requires=( 'six >= 1.10.0', - 'sqlalchemy>=1.3.24,<=1.4', + 'sqlalchemy>=1.3.24,<=1.4.44', 'vertica-python>=1.1.1' ), extras_require={ diff --git a/test/sample_objects.py b/test/sample_objects.py index 2da3d5b..bc1b3cd 100644 --- a/test/sample_objects.py +++ b/test/sample_objects.py @@ -16,9 +16,94 @@ # See the License for the specific language governing permissions and # limitations under the License. -sample_table_list = {"store":["store_orders_fact"],"public": ["customer_dimension","employee_dimension","product_dimension", "vendor_dimension"]} +sample_table_list = [ + + 'agar_dish', + 'agar_dish_1', + 'agar_dish_2', + 'baseball', + 'clicks', + 'customer_dimension', + 'date_dimension', + 'dem_votes', + 'employee', + 'employee_dimension', + 'faithful', + 'faithful_testing', + 'faithful_training', + 'house84', + 'house84_clean', + 'house84_test', + 'house84_train', + 'inventory_fact', + 'iris', + 'iris1', + 'iris2', + 'mtcars', + 'mtcars_test', + 'mtcars_train', + 'phrases', + 'product_dimension', + 'promotion_dimension', + 'readings', + 'rep_votes', + 'salary_data', + 'sampletemp', + 'shipping_dimension', + 'small_input_impute', + 'small_svd', + 'temp_data', + 'titanic_testing', + 'titanic_training', + 'transaction_data', + 'vendor_dimension', + 'vmart_load_success', + 'warehouse_dimension', + 'world' +] sample_temp_table = "sampletemp" -sample_projections = ["employee_super", "store_orders_fact_super", "ytd_orders"] +sample_projections = [ + 'date_dimension_super', + 'product_dimension_super', + 'promotion_dimension_super', + 'vendor_dimension_super', 'customer_dimension_super', + 'employee_dimension_super', + 'warehouse_dimension_super', + 'shipping_dimension_super', + 'inventory_fact_super', + 'readings_topk', + 'clicks_agg', + 'phrases_super', + 'sampletemp_super', + 'mtcars_super', + 'mtcars_train_super', + 'mtcars_test_super', + 'iris_super', + 'iris1_super', + 'iris2_super', + 'faithful_super', + 'faithful_testing_super', + 'faithful_training_super', + 'baseball_super', + 'transaction_data_super', + 'salary_data_super', + 'agar_dish_super', + 'agar_dish_1_super', + 'agar_dish_2_super', + 'house84_super', + 'house84_clean_super', + 'small_input_impute_super', + 'titanic_training_super', + 'titanic_testing_super', + 'small_svd_super', + 'employee_super', + 'temp_data_super', + 'world_super', + 'dem_votes_super', + 'rep_votes_super', + 'house84_train_super', + 'house84_test_super' +] sample_view = "sampleview" sample_columns = [ "product_key", @@ -78,9 +163,11 @@ sample_model_list = ["naive_house84_model"] -sample_tags = {"sampletemp": "dbadmin", "employee_dimension": "dbadmin", "clicks": "dbadmin"} +sample_tags = {'customer_dimension': 'dbadmin', 'product_dimension': 'dbadmin', 'promotion_dimension': 'dbadmin', 'date_dimension': 'dbadmin', 'vendor_dimension': 'dbadmin', 'employee_dimension': 'dbadmin', 'shipping_dimension': 'dbadmin', 'warehouse_dimension': 'dbadmin', 'inventory_fact': 'dbadmin', 'vmart_load_success': 'dbadmin'} sample_ml_model = "naive_house84_model" sample_oauth_name = "v_oauth" -sample_projection_properties = {'ROS_Count': 264, 'projection_name': 'store_orders_fact_super', 'Projection_Type': 'is_super_projection', 'is_segmented': 'True', 'Segmentation_key': 'hash(store_orders_fact.product_key, store_orders_fact.product_version, store_orders_fact.store_key, store_orders_fact.vendor_key, store_orders_fact.employee_key, store_orders_fact.order_number, store_orders_fact.date_ordered, store_orders_fact.date_shipped)', 'Partition_Key': '2014-06-04', 'projection_size': '7553 KB', 'Partition_Size': '2640'} + +sample_projection_properties = {'text': 'Vertica physically stores table data in projections, which are collections of table columns. Projections store data in a format that optimizes query execution For more info on projections and corresponding properties check out the Vertica Docs: https://www.vertica.com/docs', 'properties': {'ROS_Count': '1', 'Projection_Type': 'is_super_projection', 'is_segmented': 'True', 'Segmentation_key': 'hash(product_dimension.product_key, product_dimension.product_version)', 'projection_size': '19 KB', 'Partition_Key': 'Not Available', 'Partition_Size': '0', 'Projection_Cached': 'False'}} + diff --git a/test/test_integration.py b/test/test_integration.py index 7803aed..676f1c2 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -49,7 +49,7 @@ def test_has_schema(vconn): assert sc2 == True def test_has_table(vconn): - res = vconn[0].dialect.has_table(connection=vconn[1], table_name=sample.sample_table_list["store"][0], schema="store") + res = vconn[0].dialect.has_table(connection=vconn[1], table_name=sample.sample_table_list[5], schema="public") assert res == True def test_has_sequence(vconn): @@ -72,18 +72,13 @@ def test_get_schema_names(vconn): # TODO Improve this function to verify the output with a regex match def test_get_table_comment(vconn): - res = vconn[0].dialect.get_table_comment(connection=vconn[1], table_name=sample.sample_table_list["public"][0], schema="public") - table_comment = [] - for data in res['properties']: - if data['table_name'] == sample.sample_table_list["public"][0]: - table_comment.append(data) - - assert table_comment[0]['create_time'] - assert table_comment[0]['table_size'] + res = vconn[0].dialect.get_table_comment(connection=vconn[1], table_name=sample.sample_table_list[5], schema="public") + assert res['properties']['table_size'] == '2119 KB' + -# # TODO Improve this function to verify the output with a regex match +# TODO Improve this function to verify the output with a regex match def test_get_table_oid(vconn): - res = vconn[0].dialect.get_table_oid(connection=vconn[1], table_name=sample.sample_table_list["public"][1], schema="public") + res = vconn[0].dialect.get_table_oid(connection=vconn[1], table_name=sample.sample_table_list[5], schema="public") # Assert the oid is an int assert type(res) == int # Assert the format of the oid @@ -95,20 +90,15 @@ def test_get_projection_names(vconn): # Assert the no. of projections assert len(res) == 41 # Assert sample projection - assert sample.sample_projections[0] in res + assert sample.sample_projections == res def test_get_table_names(vconn): res = vconn[0].dialect.get_table_names(connection=vconn[1], schema="public") # Assert the no. of tables assert len(res) == 42 # Assert sample tables - assert all(value in res for value in sample.sample_table_list["public"]) + assert sample.sample_table_list == res - res = vconn[0].dialect.get_table_names(connection=vconn[1], schema="store") - # Assert the no of tables in another schema - assert len(res) == 3 - # Assert sample tables - assert all(value in res for value in sample.sample_table_list["store"]) def test_get_temp_table_names(vconn): res = vconn[0].dialect.get_temp_table_names(connection=vconn[1], schema="public") @@ -140,7 +130,8 @@ def test_get_temp_view_names(vconn): assert sample.sample_view in res def test_get_columns(vconn): - res = vconn[0].dialect.get_columns(connection=vconn[1], table_name=sample.sample_table_list["public"][2], schema="public") + res = vconn[0].dialect.get_columns(connection=vconn[1], table_name=sample.sample_table_list[25], schema="public") + print(res) # Assert the no. of columns assert len(res)>0 # Assert sample columns @@ -149,7 +140,7 @@ def test_get_columns(vconn): def test_get_unique_constraints(vconn): # TODO query doesnt return the result here. Query works from other clients. assert True - ucons = vconn[0].dialect.get_unique_constraints(connection=vconn[1], table_name=sample.sample_table_list["store"][0], schema="store") + ucons = vconn[0].dialect.get_unique_constraints(connection=vconn[1], table_name=sample.sample_table_list[5], schema="public") # Assert the no. of unique contraints assert len(ucons)>0 # Assert sample constraint @@ -178,17 +169,12 @@ def test_denormalize_name(vconn): def test_get_pk_constraint(vconn): # TODO query doesnt return the result here. Query works from other clients. - res = vconn[0].dialect.get_pk_constraint(connection=vconn[1], table_name=sample.sample_table_list["public"][0], schema="public") - pk_constraint = '' - for data in res: - if data['tablename'] == sample.sample_table_list["public"][0]: - pk_constraint = data['name'] - + res = vconn[0].dialect.get_pk_constraint(connection=vconn[1], table_name=sample.sample_table_list[5], schema="public") - # Assert the no. of unique contraints +# # Assert the no. of unique contraints assert len(res)>0 - # Assert sample constraint - assert pk_constraint == sample.sample_pk +# # Assert sample constraint + assert res['constrained_columns'] == sample.sample_pk def test_get_foreign_keys(vconn): @@ -202,7 +188,7 @@ def test_get_foreign_keys(vconn): def test_get_column_info(vconn): # TODO Add more tests here for other datatypes - res = vconn[0].dialect._get_column_info(name="customer_name", data_type="varchar(256)", default=None, is_nullable=False) + res = vconn[0].dialect._get_column_info(name="customer_name", data_type="varchar(256)", default=None, is_nullable=False, table_name="customer_dimension",schema='public') assert res['name'] == 'customer_name' assert res['autoincrement'] == False assert res['nullable'] == False @@ -227,38 +213,16 @@ def test_get_segmented(vconn): assert isseg[0] in ["True","False"] assert isseg[1] -def test_get_partitionkey(vconn): - pc = vconn[0].dialect._get_partitionkey(vconn[1], projection_name=sample.sample_projections[2], schema="store") - assert pc - -def test_get_projectiontype(vconn): - pt = vconn[0].dialect._get_projectiontype(vconn[1], projection_name=sample.sample_projections[1], schema="store") - assert pt == ['is_super_projection'] - -def test_get_numpartitions(vconn): - pn = vconn[0].dialect._get_numpartitions(vconn[1], projection_name=sample.sample_projections[1], schema="store") - assert pn>0 -def test_get_projectionsize(vconn): - ps = vconn[0].dialect._get_projectionsize(vconn[1], projection_name=sample.sample_projections[1], schema="store") - assert ps>0 - -def test_get_ifcachedproj(vconn): - cp = vconn[0].dialect._get_ifcachedproj(vconn[1], projection_name=sample.sample_projections[1], schema="store") - assert cp in [True,False] def test_get_projection_comment(vconn): - pc = vconn[0].dialect.get_projection_comment(vconn[1], projection_name = sample.sample_projections[1], schema="store") - projection_name=sample.sample_projections[1] + pc = vconn[0].dialect.get_projection_comment(vconn[1], projection = sample.sample_projections[1], schema="public") + projection_comments = sample.sample_projection_properties - properties = [] - for data in pc['properties']: - if data["projection_name"] == projection_name: - properties.append(data) + - print(properties[0]) - - assert properties[0] == projection_comments + assert pc == projection_comments + def test_get_model_comment(vconn): @@ -280,26 +244,12 @@ def test_get_oauth_comment(vconn): def test_get_all_owners(vconn): - owner = vconn[0].dialect.get_table_owner(vconn[1],schema='public') - table_owner = owner[0][1] + owner = vconn[0].dialect.get_table_owner(vconn[1],table=sample.sample_table_list[0] , schema='public') + table_owner = owner assert table_owner == "dbadmin" -def test_get_all_columns(vconn): - res = vconn[0].dialect.get_all_columns(connection=vconn[1], table = sample.sample_table_list["public"][2] , schema="public") - table_name=sample.sample_table_list["public"][2] - - columns = [] - for data in res: - if data['tablename'] == table_name: - columns.append(data) - # Assert the no. of columns - assert len(columns)>0 - # # Assert sample columns - assert all(value["name"] in sample.sample_columns for value in columns) - - def test_get_all_view_columns(vconn): - res = vconn[0].dialect.get_all_view_columns(connection=vconn[1],view_name = sample.sample_view, schema="public") + res = vconn[0].dialect.get_view_columns(connection=vconn[1],view = sample.sample_view, schema="public") # Assert the no. of columns assert len(res)>0 # Assert sample columns @@ -307,7 +257,9 @@ def test_get_all_view_columns(vconn): def test_get_view_comment(vconn): - res = vconn[0].dialect.get_view_comment(connection=vconn[1],view_name = sample.sample_view, schema="public") + + res = vconn[0].dialect.get_view_comment(connection=vconn[1],view = sample.sample_view, schema="public") + if res['properties'] is not None: has_comment = True else: @@ -317,38 +269,37 @@ def test_get_view_comment(vconn): def test_get_view_owner(vconn): - owner = vconn[0].dialect.get_view_owner(vconn[1],schema='public') - table_owner = owner[0][1] + owner = vconn[0].dialect.get_view_owner(vconn[1],view=sample.sample_view , schema='public') + table_owner = owner assert table_owner == "dbadmin" def test_get_projection_owner(vconn): - owner = vconn[0].dialect.get_projection_owner(vconn[1],schema='public') - table_owner = owner[0][1] + owner = vconn[0].dialect.get_projection_owner(vconn[1],projection=sample.sample_projections[1] , schema='public') + table_owner = owner assert table_owner == "dbadmin" def test_get_all_projection_columns(vconn): - res = vconn[0].dialect.get_all_projection_columns(connection=vconn[1], projection_name='inventory_fact_super', schema="public") + + res = vconn[0].dialect.get_projection_columns(connection=vconn[1], projection='inventory_fact_super', schema="public") + projection_name = 'inventory_fact_super' - columns = [] - for data in res: - if data['tablename'] == projection_name: - columns.append(data) + # Assert the no. of columns assert len(res)>0 # Assert sample columns - assert all(value["name"] in sample.sample_projection_columns for value in columns) + assert all(value["name"] in sample.sample_projection_columns for value in res) def test__populate_view_lineage(vconn): - res = vconn[0].dialect._populate_view_lineage(connection=vconn[1], schema="public") + res = vconn[0].dialect._populate_view_lineage(connection=vconn[1], view=sample.sample_view ,schema="public") upstream = "public.customer_dimension" downstream = next(iter(res.keys())) assert res[downstream][0][0] == upstream def test__populate_projection_lineage(vconn): - res = vconn[0].dialect._populate_projection_lineage(connection=vconn[1], schema="public") + res = vconn[0].dialect._populate_projection_lineage(connection=vconn[1],projection=sample.sample_projections[1] ,schema="public") upstream = "public.date_dimension" downstream = next(iter(res.keys())) assert res[downstream][0][0] == upstream diff --git a/vertica_sqlalchemy_dialect/base.py b/vertica_sqlalchemy_dialect/base.py index c835127..280b7f7 100644 --- a/vertica_sqlalchemy_dialect/base.py +++ b/vertica_sqlalchemy_dialect/base.py @@ -26,60 +26,83 @@ from sqlalchemy import sql from sqlalchemy import util from textwrap import dedent -import math -import re from collections import defaultdict from functools import lru_cache import re + from sqlalchemy.dialects.postgresql import BYTEA, DOUBLE_PRECISION, INTERVAL from sqlalchemy.dialects.postgresql.base import PGDialect, PGDDLCompiler from sqlalchemy.engine import default from sqlalchemy.engine import reflection -from sqlalchemy.types import INTEGER, BIGINT, SMALLINT, VARCHAR, CHAR, \ - NUMERIC, FLOAT, REAL, DATE, DATETIME, BOOLEAN, BLOB, TIMESTAMP, TIME +from sqlalchemy.types import ( + INTEGER, + BIGINT, + SMALLINT, + VARCHAR, + CHAR, + NUMERIC, + FLOAT, + REAL, + DATE, + DATETIME, + BOOLEAN, + BLOB, + TIMESTAMP, + TIME, + VARBINARY, + BINARY, +) from sqlalchemy.sql.sqltypes import TIME, TIMESTAMP, String from sqlalchemy.sql import sqltypes +from functools import lru_cache + logger: logging.Logger = logging.getLogger(__name__) ischema_names = { - 'INT': INTEGER, - 'INTEGER': INTEGER, - 'INT8': INTEGER, - 'BIGINT': BIGINT, - 'SMALLINT': SMALLINT, - 'TINYINT': SMALLINT, - 'CHAR': CHAR, - 'VARCHAR': VARCHAR, - 'VARCHAR2': VARCHAR, - 'TEXT': VARCHAR, - 'NUMERIC': NUMERIC, - 'DECIMAL': NUMERIC, - 'NUMBER': NUMERIC, - 'MONEY': NUMERIC, - 'FLOAT': FLOAT, - 'FLOAT8': FLOAT, - 'REAL': REAL, - 'DOUBLE': DOUBLE_PRECISION, - 'TIMESTAMP': TIMESTAMP, - 'TIMESTAMP WITH TIMEZONE': TIMESTAMP, - 'TIMESTAMPTZ': TIMESTAMP(timezone=True), - 'TIME': TIME, - 'TIME WITH TIMEZONE': TIME, - 'TIMETZ': TIME(timezone=True), - 'INTERVAL': INTERVAL, - 'DATE': DATE, - 'DATETIME': DATETIME, - 'SMALLDATETIME': DATETIME, - 'BINARY': BLOB, - 'VARBINARY': BLOB, - 'RAW': BLOB, - 'BYTEA': BYTEA, - 'BOOLEAN': BOOLEAN, - 'LONG VARBINARY': BLOB, - 'LONG VARCHAR': VARCHAR, - 'GEOMETRY': BLOB, + "INT": INTEGER, + "INTEGER": INTEGER, + "INT8": INTEGER, + "BIGINT": BIGINT, + "SMALLINT": SMALLINT, + "TINYINT": SMALLINT, + "CHAR": CHAR, + "VARCHAR": VARCHAR, + "VARCHAR2": VARCHAR, + "TEXT": VARCHAR, + "NUMERIC": NUMERIC, + "DECIMAL": NUMERIC, + "NUMBER": NUMERIC, + "MONEY": NUMERIC, + "FLOAT": FLOAT, + "FLOAT8": FLOAT, + "REAL": REAL, + "DOUBLE": DOUBLE_PRECISION, + "TIMESTAMP": TIMESTAMP, + "TIMESTAMP WITH TIMEZONE": TIMESTAMP(timezone=True), + "TIMESTAMPTZ": TIMESTAMP(timezone=True), + "TIME": TIME, + "TIME WITH TIMEZONE": TIME(timezone=True), + "TIMETZ": TIME(timezone=True), + "INTERVAL": INTERVAL, + "INTERVAL HOUR TO SECOND":INTERVAL, + "INTERVAL HOUR TO MINUTE":INTERVAL, + "INTERVAL DAY TO SECOND":INTERVAL, + "INTERVAL YEAR TO MONTH":INTERVAL, + "DOUBLE PRECISION": DOUBLE_PRECISION, + "DATE": DATE, + "DATETIME": DATETIME, + "SMALLDATETIME": DATETIME, + "BINARY": BINARY, + "VARBINARY": VARBINARY, + "RAW": BLOB, + "BYTEA": BYTEA, + "BOOLEAN": BOOLEAN, + "LONG VARBINARY": BLOB, + "LONG VARCHAR": VARCHAR, + "GEOMETRY": BLOB, + "GEOGRAPHY":BLOB } @@ -91,25 +114,31 @@ class UUID(String): class TIMESTAMP_WITH_PRECISION(TIMESTAMP): """The SQL TIMESTAMP With Precision type. + Since Vertica supports precision values for timestamp this allows ingestion of timestamp fields with precision values. PS: THIS DATA IS CURRENTLY UNUSED, IT JUST FIXES INGESTION PROBLEMS TODO: Should research the possibility of reflecting the precision in the schema + """ __visit_name__ = "TIMESTAMP" def __init__(self, timezone=False, precision=None): """Construct a new :class:`_types.TIMESTAMP_WITH_PRECISION`. + :param timezone: boolean. Indicates that the TIMESTAMP type should enable timezone support, if available on the target database. On a per-dialect basis is similar to "TIMESTAMP WITH TIMEZONE". If the target database does not support timezones, this flag is ignored. :param precision: integer. Indicates the PRECISION field when provided + + """ super(TIMESTAMP, self).__init__(timezone=timezone) self.precision = precision + def TIMESTAMP_WITH_TIMEZONE(*args, **kwargs): @@ -142,12 +171,12 @@ def get_column_specification(self, column, **kwargs): class VerticaInspector(reflection.Inspector): - dialect: VerticaDialect + dialect: VerticaDialect def get_projection_names( self, schema: Optional[str] = None, **kw: Any ) -> List[str]: - r"""Return all Models names within a particular schema. """ + r"""Return all Models names within a particular schema.""" return self.dialect.get_projection_names( self.bind, schema, info_cache=self.info_cache, **kw @@ -172,11 +201,9 @@ def _get_extra_tags(self, table, schema=None): :param: table: Name of the table """ - return self.dialect._get_extra_tags( - self.bind, table, schema - ) + return self.dialect._get_extra_tags(self.bind, table, schema) - def get_projection_comment(self, schema=None, **kw): + def get_projection_comment(self, projection,schema=None, **kw): """Return information about the table properties for ``table_name``. as key and value. @@ -186,7 +213,7 @@ def get_projection_comment(self, schema=None, **kw): """ return self.dialect.get_projection_comment( - self.bind, schema, info_cache=self.info_cache, **kw + self.bind,projection, schema, info_cache=self.info_cache, **kw ) def get_model_comment(self, model_name, schema=None, **kw): @@ -227,7 +254,7 @@ def get_oauth_comment(self, oauth, schema=None, **kw): self.bind, oauth, schema, info_cache=self.info_cache, **kw ) - def _get_database_properties(self, db_name, **kw): + def _get_database_properties(self, db_name, **kw): """Return information about the database properties . as key and value. @@ -235,9 +262,7 @@ def _get_database_properties(self, db_name, **kw): :param: db_name return dictionary """ - return self.dialect._get_database_properties( - self.bind, db_name, **kw - ) + return self.dialect._get_database_properties(self.bind, db_name, **kw) def _get_schema_properties(self, schema, **kw): """Return information about the schema properties . @@ -247,91 +272,84 @@ def _get_schema_properties(self, schema, **kw): :param: db_name return dictionary """ - return self.dialect._get_schema_properties( - self.bind, schema, **kw - ) - - def get_table_owner(self, schema: Optional[str] = None, **kw: Any): - r"""Return primary key columns names within a particular schema. """ + return self.dialect._get_schema_properties(self.bind, schema, **kw) + + def get_table_owner( + self, table: Optional[str] = None, schema: Optional[str] = None, **kw: Any + ): + r"""Return primary key columns names within a particular schema.""" return self.dialect.get_table_owner( - self.bind, schema, info_cache=self.info_cache, **kw + self.bind, table, schema, info_cache=self.info_cache, **kw ) - def get_all_columns(self,schema: Optional[str] = None, **kw: Any): - r"""Return all table columns names within a particular schema. """ + def get_all_columns(self, table, schema: Optional[str] = None, **kw: Any): + r"""Return all table columns names within a particular schema.""" return self.dialect.get_all_columns( - self.bind, schema, info_cache=self.info_cache, **kw + self.bind, table, schema, info_cache=self.info_cache, **kw ) - def get_pk_constraint(self, schema=None, **kw): - - return self.dialect.get_pk_constraint( - self.bind, schema, info_cache=self.info_cache, **kw - ) - - def get_table_comment(self, table: Optional[str] = None,schema: Optional[str] = None,**kw): - + + + def get_table_comment( + self, table: Optional[str] = None, schema: Optional[str] = None, **kw + ): return self.dialect.get_table_comment( - self.bind,table , schema, info_cache=self.info_cache, **kw + self.bind, table, schema, info_cache=self.info_cache, **kw ) - - def get_all_view_columns(self, schema: Optional[str] = None, **kw: Any): - r"""Return all view columns names within a particular schema. """ - return self.dialect.get_all_view_columns( - self.bind , schema, info_cache=self.info_cache, **kw + def get_view_columns(self,view: Optional[str] = None, schema: Optional[str] = None, **kw: Any): + r"""Return all view columns names within a particular schema.""" + + return self.dialect.get_view_columns( + self.bind,view, schema, info_cache=self.info_cache, **kw ) - def get_view_comment(self,schema: Optional[str] = None,**kw): - r"""Return view comments within a particular schema. """ + def get_view_comment(self, view: Optional[str] = None, schema: Optional[str] = None, **kw): + r"""Return view comments within a particular schema.""" return self.dialect.get_view_comment( - self.bind,schema, info_cache=self.info_cache, **kw + self.bind, view, schema, info_cache=self.info_cache, **kw ) - - - def get_view_owner(self, schema: Optional[str] = None, **kw: Any): - r"""Return primary key columns names within a particular schema. """ + + def get_view_owner(self,view: Optional[str] = None, schema: Optional[str] = None, **kw: Any): + r"""Return primary key columns names within a particular schema.""" return self.dialect.get_view_owner( - self.bind, schema, info_cache=self.info_cache, **kw - ) - - def _populate_view_lineage(self, schema: Optional[str] = None, **kw: Any): - r"""Return primary key columns names within a particular schema. """ + self.bind, view, schema, info_cache=self.info_cache, **kw + ) - return self.dialect._populate_view_lineage( - self.bind, schema , **kw - ) - - def get_all_projection_columns(self,schema: Optional[str] = None, **kw: Any): - r"""Return all projection columns names within a particular schema. """ + def _populate_view_lineage(self, view: Optional[str] = None, schema: Optional[str] = None, **kw: Any): + r"""Return upstream and downstream of a view.""" + + return self.dialect._populate_view_lineage(self.bind, view, schema, **kw) - return self.dialect.get_all_projection_columns( - self.bind,schema, info_cache=self.info_cache, **kw + def get_projection_columns(self, projection: Optional[str] = None, schema: Optional[str] = None, **kw: Any): + r"""Return all projection columns names within a particular schema.""" + + return self.dialect.get_projection_columns( + self.bind,projection, schema, info_cache=self.info_cache, **kw ) - - def get_projection_owner(self, schema: Optional[str] = None, **kw: Any): - r"""Return all projection columns names within a particular schema. """ + + def get_projection_owner(self,projection, schema: Optional[str] = None, **kw: Any): + r"""Return all projection columns names within a particular schema.""" return self.dialect.get_projection_owner( - self.bind, schema, info_cache=self.info_cache, **kw + self.bind,projection, schema, info_cache=self.info_cache, **kw ) - - def _populate_projection_lineage(self, schema: Optional[str] = None, **kw: Any): - r"""Return primary key columns names within a particular schema. """ - return self.dialect._populate_projection_lineage( - self.bind, schema , **kw - ) + def _populate_projection_lineage(self,projection, schema: Optional[str] = None, **kw: Any): + r"""Return primary key columns names within a particular schema.""" + + return self.dialect._populate_projection_lineage(self.bind, projection,schema, **kw) + + # noinspection PyArgumentList,PyAbstractClass class VerticaDialect(default.DefaultDialect): - - name = 'vertica' + name = "vertica" ischema_names = ischema_names ddl_compiler = VerticaDDLCompiler inspector = VerticaInspector @@ -353,7 +371,8 @@ def _get_server_version_info(self, connection): m = re.match(r".*Vertica Analytic Database v(\d+)\.(\d+)\.(\d)+.*", v) if not m: raise AssertionError( - "Could not determine version from string '%(ver)s'" % {'ver': v}) + "Could not determine version from string '%(ver)s'" % {"ver": v} + ) return tuple([int(x) for x in m.group(1, 2, 3) if x is not None]) # noinspection PyRedeclaration @@ -361,17 +380,22 @@ def _get_default_schema_name(self, connection): return connection.scalar("SELECT current_schema()") def create_connect_args(self, url): - opts = url.translate_connect_args(username='user') + opts = url.translate_connect_args(username="user") opts.update(url.query) return [], opts def has_schema(self, connection, schema): - has_schema_sql = sql.text(dedent(""" + has_schema_sql = sql.text( + dedent( + """ SELECT EXISTS ( SELECT schema_name FROM v_catalog.schemata WHERE lower(schema_name) = '%(schema)s') - """ % {'schema': schema.lower()})) + """ + % {"schema": schema.lower()} + ) + ) c = connection.execute(has_schema_sql) return bool(c.scalar()) @@ -380,13 +404,18 @@ def has_table(self, connection, table_name, schema=None): if schema is None: schema = self._get_default_schema_name(connection) - has_table_sql = sql.text(dedent(""" + has_table_sql = sql.text( + dedent( + """ SELECT EXISTS ( SELECT table_name FROM v_catalog.all_tables WHERE lower(table_name) = '%(table)s' AND lower(schema_name) = '%(schema)s') - """ % {'schema': schema.lower(), 'table': table_name.lower()})) + """ + % {"schema": schema.lower(), "table": table_name.lower()} + ) + ) c = connection.execute(has_table_sql) return bool(c.scalar()) @@ -395,100 +424,140 @@ def has_sequence(self, connection, sequence_name, schema=None): if schema is None: schema = self._get_default_schema_name(connection) - has_seq_sql = sql.text(dedent(""" + has_seq_sql = sql.text( + dedent( + """ SELECT EXISTS ( SELECT sequence_name FROM v_catalog.sequences WHERE lower(sequence_name) = '%(sequence)s' AND lower(sequence_schema) = '%(schema)s') - """ % {'schema': schema.lower(), 'sequence': sequence_name.lower()})) + """ + % {"schema": schema.lower(), "sequence": sequence_name.lower()} + ) + ) c = connection.execute(has_seq_sql) return bool(c.scalar()) def has_type(self, connection, type_name): - has_type_sql = sql.text(dedent(""" + has_type_sql = sql.text( + dedent( + """ SELECT EXISTS ( SELECT type_name FROM v_catalog.types WHERE lower(type_name) = '%(type)s') - """ % {'type': type_name.lower()})) + """ + % {"type": type_name.lower()} + ) + ) c = connection.execute(has_type_sql) return bool(c.scalar()) def _get_database_properties(self, connection, database): - - - try: - - - cluster_type_qry = sql.text(dedent( - """SELECT CASE COUNT(*) WHEN 0 THEN 'Enterprise' ELSE 'Eon' END AS database_mode FROM v_catalog.shards """)) + cluster_type_qry = sql.text( + dedent( + """SELECT CASE COUNT(*) WHEN 0 THEN 'Enterprise' ELSE 'Eon' END AS database_mode FROM v_catalog.shards """ + ) + ) - communal_storage_path = sql.text(dedent( - """SELECT location_path from storage_locations WHERE sharing_type = 'COMMUNAL' """)) + communal_storage_path = sql.text( + dedent( + """SELECT location_path from storage_locations WHERE sharing_type = 'COMMUNAL' """ + ) + ) cluster_type = "" communal_path = "" cluster_type_res = connection.execute(cluster_type_qry) for each in cluster_type_res: cluster_type = each.database_mode - if cluster_type.lower() == 'eon': + if cluster_type.lower() == "eon": for each in connection.execute(communal_storage_path): communal_path += str(each.location_path) + " | " - SUBCLUSTER_SIZE = sql.text(dedent(""" + SUBCLUSTER_SIZE = sql.text( + dedent( + """ SELECT subclusters.subcluster_name , CAST(sum(disk_space_used_mb // 1024) as varchar(10)) as subclustersize from subclusters inner join disk_storage using (node_name) group by subclusters.subcluster_name - """)) + """ + ) + ) subclusters = " " for data in connection.execute(SUBCLUSTER_SIZE): - subclusters += f"{data['subcluster_name']} -- {data['subclustersize']} GB | " - cluster__size = sql.text(dedent(""" + subclusters += ( + f"{data['subcluster_name']} -- {data['subclustersize']} GB | " + ) + cluster__size = sql.text( + dedent( + """ select ROUND(SUM(disk_space_used_mb) //1024 ) as cluster_size from disk_storage - """)) + """ + ) + ) cluster_size = "" for each in connection.execute(cluster__size): cluster_size = str(each.cluster_size) + " GB" - return {"cluster_type": cluster_type, "cluster_size": cluster_size, 'subcluster': subclusters, - "communal_storage_path": communal_path} + return { + "cluster_type": cluster_type, + "cluster_size": cluster_size, + "subcluster": subclusters, + "communal_storage_path": communal_path, + } except Exception as ex: - logging.warning( - f"{database}", f"unable to get extra_properties : {ex}") + logging.warning(f"{database}", f"unable to get extra_properties : {ex}") def _get_schema_properties(self, connection, schema): try: - # Projection count - projection_count_query = sql.text(dedent(""" + projection_count_query = sql.text( + dedent( + """ SELECT COUNT(projection_name) as pc from v_catalog.projections WHERE lower(projection_schema) = '%(schema)s' - """ % {"schema": schema.lower()})) + """ + % {"schema": schema.lower()} + ) + ) projection_count = None for each in connection.execute(projection_count_query): projection_count = each.pc - UDL_LANGUAGE = sql.text(dedent(""" + UDL_LANGUAGE = sql.text( + dedent( + """ SELECT lib_name , description FROM USER_LIBRARIES WHERE lower(schema_name) = '%(schema)s' - """ % {"schema": schema.lower()})) + """ + % {"schema": schema.lower()} + ) + ) # UDX list - UDX_functions_qry = sql.text(dedent(""" + UDX_functions_qry = sql.text( + dedent( + """ SELECT function_name FROM USER_FUNCTIONS Where schema_name = '%(schema)s' - """ % {'schema': schema.lower(), })) + """ + % { + "schema": schema.lower(), + } + ) + ) udx_list = "" for each in connection.execute(UDX_functions_qry): udx_list += each.function_name + ", " @@ -497,85 +566,110 @@ def _get_schema_properties(self, connection, schema): user_defined_library = "" for data in connection.execute(UDL_LANGUAGE): - user_defined_library += f"{data['lib_name']} -- {data['description']} | " + user_defined_library += ( + f"{data['lib_name']} -- {data['description']} | " + ) # print("projection_count: " + str(projection_count) - return {"projection_count": str(projection_count), - 'udx_list': str(udx_list), 'udx_language': str(user_defined_library)} + return { + "projection_count": str(projection_count), + "udx_list": str(udx_list), + "udx_language": str(user_defined_library), + } # return {"projection_count": "projection_count"} except Exception as ex: self.report.report_failure( - f"{schema}", f"unable to get extra_properties : {ex}") + f"{schema}", f"unable to get extra_properties : {ex}" + ) @reflection.cache def get_schema_names(self, connection, **kw): - get_schemas_sql = sql.text(dedent(""" + get_schemas_sql = sql.text( + dedent( + """ SELECT schema_name FROM v_catalog.schemata - """)) + """ + ) + ) c = connection.execute(get_schemas_sql) - return [row[0] for row in c if not row[0].startswith('v_')] - - @reflection.cache - def get_table_comment(self, connection,table_name, schema=None, **kw): - if schema is not None: - schema_condition = "lower(table_schema) = '%(schema)s'" % { - 'schema': schema.lower()} - else: - schema_condition = "1" - - - tables = table_name - sct = sql.text(dedent(""" + return [row[0] for row in c if not row[0].startswith("v_")] + + + @lru_cache(maxsize=None) + def fetch_table_properties(self,connection, schema): + sct = sql.text( + dedent( + """ SELECT create_time , table_name FROM v_catalog.tables where lower(table_schema) = '%(schema)s' - """ % {'schema': schema.lower()})) - - sts = sql.text(dedent(""" + """ + % {"schema": schema.lower()} + ) + ) + sts = sql.text( + dedent( + """ SELECT used_bytes ,anchor_table_name FROM v_monitor.column_storage where lower(anchor_table_schema) = '%(schema)s' - """ % {'schema': schema.lower()})) - - columns = [] - + """ + % {"schema": schema.lower()} + ) + ) + properties = [] for row in connection.execute(sct): - - - columns.append({'create_time': str(row[0]), 'table_name': row[1]}) - - table_size_dict={} + properties.append({"create_time": str(row[0]), "table_name": row[1]}) + table_size_dict = {} for table_size in connection.execute(sts): - if row[1] in tables: - if table_size[1] not in table_size_dict: - table_size_dict[table_size[1]] = table_size[0] / 1024 - - else: - table_size_dict[table_size[1]] += table_size[0] / 1024 - - for a in columns: - if a['table_name'] in table_size_dict: - a['table_size'] = str(int(table_size_dict[a['table_name']])) + " KB" - + if table_size[1] not in table_size_dict: + table_size_dict[table_size[1]] = table_size[0] / 1024 else: - a['table_size'] = "0 KB" - a['table_name'] = a['table_name'].lower() - return {"text": "References the properties of a native table in Vertica. \ - Vertica physically stores table data in projections, which are collections of table columns. \ - Projections store data in a format that optimizes query execution. \ - In order to query or perform any operation on a Vertica table, the table must have one or more projections associated with it. ", - "properties": columns } + table_size_dict[table_size[1]] += table_size[0] / 1024 + for a in properties: + if a["table_name"] in table_size_dict: + a["table_size"] = str(int(table_size_dict[a["table_name"]])) + " KB" + else: + a["table_size"] = "0 KB" + a["table_name"] = a["table_name"].lower() + return properties + + def get_table_comment(self, connection, table_name, schema=None, **kw): + if schema is not None: + schema = schema.lower() + + properties = self.fetch_table_properties(connection, schema) + filtered_properties = [ + prop + for prop in properties + if prop["table_name"].lower() == table_name.lower() + ] + + table_properties = { + "create_time": filtered_properties[0]['create_time'], + "table_size": filtered_properties[0]['table_size'], + } + + return { + "text": "References the properties of a native table in Vertica. \ + Vertica physically stores table data in projections, which are collections of table columns. \ + Projections store data in a format that optimizes query execution. \ + In order to query or perform any operation on a Vertica table, the table must have one or more projections associated with it. ", + "properties": table_properties, + } @reflection.cache def get_table_oid(self, connection, table_name, schema=None, **kw): if schema is None: schema = self._get_default_schema_name(connection) - get_oid_sql = sql.text(dedent(""" + get_oid_sql = sql.text( + dedent( + """ SELECT A.table_id FROM (SELECT table_id, table_name, table_schema FROM v_catalog.tables @@ -583,7 +677,10 @@ def get_table_oid(self, connection, table_name, schema=None, **kw): SELECT table_id, table_name, table_schema FROM v_catalog.views) AS A WHERE lower(A.table_name) = '%(table)s' AND lower(A.table_schema) = '%(schema)s' - """ % {'schema': schema.lower(), 'table': table_name.lower()})) + """ + % {"schema": schema.lower(), "table": table_name.lower()} + ) + ) c = connection.execute(get_oid_sql) table_oid = c.scalar() @@ -595,15 +692,21 @@ def get_table_oid(self, connection, table_name, schema=None, **kw): def get_projection_names(self, connection, schema=None, **kw): if schema is not None: schema_condition = "lower(projection_schema) = '%(schema)s'" % { - 'schema': schema.lower()} + "schema": schema.lower() + } else: schema_condition = "1" - get_projection_sql = sql.text(dedent(""" + get_projection_sql = sql.text( + dedent( + """ SELECT projection_name from v_catalog.projections WHERE %(schema_condition)s - """ % {'schema_condition': schema_condition})) + """ + % {"schema_condition": schema_condition} + ) + ) c = connection.execute(get_projection_sql) @@ -613,16 +716,22 @@ def get_projection_names(self, connection, schema=None, **kw): def get_table_names(self, connection, schema=None, **kw): if schema is not None: schema_condition = "lower(table_schema) = '%(schema)s'" % { - 'schema': schema.lower()} + "schema": schema.lower() + } else: schema_condition = "1" - get_tables_sql = sql.text(dedent(""" + get_tables_sql = sql.text( + dedent( + """ SELECT table_name FROM v_catalog.tables WHERE %(schema_condition)s ORDER BY table_schema, table_name - """ % {'schema_condition': schema_condition})) + """ + % {"schema_condition": schema_condition} + ) + ) c = connection.execute(get_tables_sql) return [row[0] for row in c] @@ -631,17 +740,23 @@ def get_table_names(self, connection, schema=None, **kw): def get_temp_table_names(self, connection, schema=None, **kw): if schema is not None: schema_condition = "lower(table_schema) = '%(schema)s'" % { - 'schema': schema.lower()} + "schema": schema.lower() + } else: schema_condition = "1" - get_tables_sql = sql.text(dedent(""" + get_tables_sql = sql.text( + dedent( + """ SELECT table_name FROM v_catalog.tables WHERE %(schema_condition)s AND IS_TEMP_TABLE ORDER BY table_schema, table_name - """ % {'schema_condition': schema_condition})) + """ + % {"schema_condition": schema_condition} + ) + ) c = connection.execute(get_tables_sql) return [row[0] for row in c] @@ -650,100 +765,112 @@ def get_temp_table_names(self, connection, schema=None, **kw): def get_view_names(self, connection, schema=None, **kw): if schema is not None: schema_condition = "lower(table_schema) = '%(schema)s'" % { - 'schema': schema.lower()} + "schema": schema.lower() + } else: schema_condition = "1" - get_views_sql = sql.text(dedent(""" + get_views_sql = sql.text( + dedent( + """ SELECT table_name FROM v_catalog.views WHERE %(schema_condition)s ORDER BY table_schema, table_name - """ % {'schema_condition': schema_condition})) + """ + % {"schema_condition": schema_condition} + ) + ) c = connection.execute(get_views_sql) return [row[0] for row in c] - - def get_view_definition(self, connection, view_name, schema=None, **kw): + + @lru_cache(maxsize=None) + def fetch_view_definitions(self, connection,schema): if schema is not None: schema_condition = "lower(table_schema) = '%(schema)s'" % { "schema": schema.lower() } else: schema_condition = "1" - - view_def = connection.scalar( - sql.text( + + definition = [] + + view_def = sql.text( dedent( """ - SELECT VIEW_DEFINITION + SELECT VIEW_DEFINITION , table_name FROM V_CATALOG.VIEWS - WHERE table_name='%(view_name)s' AND %(schema_condition)s + WHERE table_schema='%(schema)s' """ - % {"view_name": view_name, "schema_condition": schema_condition} + % {"schema": schema.lower()} ) ) - ) - - return view_def + + for data in connection.execute(view_def): + definition.append({ + "view_def": data['VIEW_DEFINITION'], + "table_name": data['table_name'] + }) - # Vertica does not support global temporary views. - @reflection.cache - def get_temp_view_names(self, connection, schema=None, **kw): - return [] + return definition + - @reflection.cache - def get_columns(self, connection, table_name, schema=None, **kw): + def get_view_definition(self, connection, view_name, schema=None, **kw): if schema is not None: schema_condition = "lower(table_schema) = '%(schema)s'" % { - 'schema': schema.lower()} + "schema": schema.lower() + } else: schema_condition = "1" + + view_def = self.fetch_view_definitions(connection,schema) + + def_info = [ + prop for prop in view_def if prop["table_name"].lower() == view_name.lower() + ] + + if len(def_info) == 0: + return None + else: + return def_info[0]['view_def'] - s = sql.text(dedent(""" - SELECT column_name, data_type, column_default, is_nullable - FROM v_catalog.columns - WHERE lower(table_name) = '%(table)s' - AND %(schema_condition)s - UNION ALL - SELECT column_name, data_type, '' as column_default, true as is_nullable - FROM v_catalog.view_columns - WHERE lower(table_name) = '%(table)s' - AND %(schema_condition)s - UNION ALL - SELECT projection_column_name,data_type,'' as column_default, true as is_nullable - FROM PROJECTION_COLUMNS - WHERE lower(projection_name) = '%(table)s' - AND %(schema_condition)s - """ % {'table': table_name.lower(), 'schema_condition': schema_condition})) - spk = sql.text(dedent(""" - SELECT column_name - FROM v_catalog.primary_keys - WHERE lower(table_name) = '%(table)s' - AND constraint_type = 'p' - AND %(schema_condition)s - """ % {'table': table_name.lower(), 'schema_condition': schema_condition})) + return view_definition + + # Vertica does not support global temporary views. + @reflection.cache + def get_temp_view_names(self, connection, schema=None, **kw): + return [] + + @lru_cache(maxsize=None) + def fetch_table_columns(self, connection, schema): + s = sql.text( + dedent( + """ + SELECT column_name, data_type, '' as column_default, true as is_nullable, lower(table_name) as table_name + FROM v_catalog.columns + where lower(table_schema) = '%(schema)s' + """ + % {"schema": schema.lower()} + ) + ) - pk_columns = [x[0] for x in connection.execute(spk)] columns = [] for row in connection.execute(s): name = row.column_name dtype = row.data_type.lower() - primary_key = name in pk_columns default = row.column_default nullable = row.is_nullable - + table_name = row.table_name.lower() column_info = self._get_column_info( - name, - dtype, - default, - nullable, - schema, + name, dtype, default, nullable, table_name, schema ) - column_info.update({'primary_key': primary_key}) columns.append(column_info) + return columns + + # TODO this function doesnt seem to work even though the query is right @@ -763,24 +890,29 @@ def get_unique_constraints(self, connection, table_name, schema=None, **kw): ) ) c = connection.execute(get_constraints_sql) - return [{'name': name, 'column_names': cols} for name, cols in c.fetchall()] + return [{"name": name, "column_names": cols} for name, cols in c.fetchall()] @reflection.cache - def get_check_constraints( - self, connection, table_name, schema=None, **kw): - table_oid = self.get_table_oid(connection, table_name, schema, - info_cache=kw.get('info_cache')) + def get_check_constraints(self, connection, table_name, schema=None, **kw): + table_oid = self.get_table_oid( + connection, table_name, schema, info_cache=kw.get("info_cache") + ) - constraints_sql = sql.text(dedent(""" + constraints_sql = sql.text( + dedent( + """ SELECT constraint_name, column_name FROM v_catalog.constraint_columns WHERE table_id = %(oid)s AND constraint_type = 'c' - """ % {'oid': table_oid})) + """ + % {"oid": table_oid} + ) + ) c = connection.execute(constraints_sql) - return [{'name': name, 'sqltext': col} for name, col in c.fetchall()] + return [{"name": name, "sqltext": col} for name, col in c.fetchall()] def normalize_name(self, name): name = name and name.rstrip() @@ -801,7 +933,7 @@ def denormalize_name(self, name): def get_foreign_keys(self, connection, table_name, schema=None, **kw): return [] - # TODO complete the foreign keys function + # TODO complete the foreign keys function @reflection.cache def get_indexes(self, connection, table_name, schema, **kw): return [] @@ -812,9 +944,8 @@ def visit_create_index(self, create): return None def _get_column_info( # noqa: C901 - self, name, data_type, default, is_nullable, schema=None + self, name, data_type, default, is_nullable, table_name, schema=None ): - attype: str = re.sub(r"\(.*\)", "", data_type) charlen = re.search(r"\(([\d,]+)\)", data_type) @@ -835,11 +966,11 @@ def _get_column_info( # noqa: C901 args = () # type: ignore elif attype == "integer": args = () # type: ignore - # elif attype in ("timestamptz", "timetz"): - # kwargs["timezone"] = True + elif attype in ("timestamptz", "timetz"): + kwargs["timezone"] = True # # if charlen: # # kwargs["precision"] = int(charlen) # type: ignore - # args = () # type: ignore + args = () # type: ignore # elif attype in ("timestamp", "time"): # kwargs["timezone"] = False # # if charlen: @@ -904,141 +1035,218 @@ def _get_column_info( # noqa: C901 nullable=is_nullable, default=default, autoincrement=autoincrement, - comment=str(default) + table_name=table_name, + comment=str(default), ) return column_info @reflection.cache def get_models_names(self, connection, schema=None, **kw): - - get_models_sql = sql.text(dedent(""" + get_models_sql = sql.text( + dedent( + """ SELECT model_name FROM models WHERE lower(schema_name) = '%(schema)s' ORDER BY model_name - """ % {'schema': schema.lower()})) + """ + % {"schema": schema.lower()} + ) + ) c = connection.execute(get_models_sql) return [row[0] for row in c] def get_Oauth_names(self, connection, schema=None, **kw): - - get_oauth_sql = sql.text(dedent(""" + get_oauth_sql = sql.text( + dedent( + """ SELECT auth_name from v_catalog.client_auth WHERE auth_method = 'OAUTH' - """ % {'schema': schema})) + """ + % {"schema": schema} + ) + ) print("auth connection", schema.lower()) c = connection.execute(get_oauth_sql) return [row[0] for row in c] - def get_pk_constraint(self, connection,schema: None, **kw): + @lru_cache(maxsize=None) + def fetch_pk_constraint(self, connection, schema): if schema is not None: schema_condition = "lower(table_schema) = '%(schema)s'" % { - 'schema': schema.lower()} + "schema": schema.lower() + } else: schema_condition = "1" - spk = sql.text(dedent(""" + spk = sql.text( + dedent( + """ SELECT column_name ,table_name FROM v_catalog.primary_keys WHERE lower(table_schema) = '%(schema)s' - """ % {'schema': schema.lower()})) + """ + % {"schema": schema.lower()} + ) + ) pk_columns = [] - + for row in connection.execute(spk): - columns = row['column_name'] - table_name = row['table_name'].lower() - pk_columns.append({'constrained_columns': [columns], 'name': [columns],"tablename":table_name}) + columns = row["column_name"] + table_name = row["table_name"].lower() + pk_columns.append( + { + "constrained_columns": [columns], + "name": [columns], + "table_name": table_name, + } + ) return pk_columns + def get_pk_constraint(self, connection, table_name, schema: None, **kw): + pk = self.fetch_pk_constraint(connection, schema) + + pk_columns = [ + prop for prop in pk if prop["table_name"].lower() == table_name.lower() + ] + + if len(pk_columns) == 0: + return None + else: + return pk_columns[0] + + + # @reflection.cache def _get_extra_tags( self, connection, name, schema=None ) -> Optional[Dict[str, str]]: - if schema is not None: schema_condition = "lower(table_schema) = '%(schema)s'" % { - 'schema': schema.lower()} + "schema": schema.lower() + } else: schema_condition = "1" owner_res = None if name == "table": - table_owner_command = sql.text(dedent(""" + table_owner_command = sql.text( + dedent( + """ SELECT table_name, owner_name FROM v_catalog.tables WHERE %(schema_condition)s - """ % {'schema_condition': schema_condition})) + """ + % {"schema_condition": schema_condition} + ) + ) owner_res = connection.execute(table_owner_command) elif name == "projection": - table_owner_command = sql.text(dedent(""" + table_owner_command = sql.text( + dedent( + """ SELECT projection_name as table_name, owner_name FROM v_catalog.projections WHERE lower(projection_schema) = '%(schema)s' - """ % {'schema': schema.lower()})) + """ + % {"schema": schema.lower()} + ) + ) owner_res = connection.execute(table_owner_command) elif name == "view": - table_owner_command = sql.text(dedent(""" + table_owner_command = sql.text( + dedent( + """ SELECT table_name, owner_name FROM v_catalog.views WHERE %(schema_condition)s - """ % {'schema_condition': schema_condition})) + """ + % {"schema_condition": schema_condition} + ) + ) owner_res = connection.execute(table_owner_command) final_tags = dict() for each in owner_res: - final_tags[each['table_name']] = each['owner_name'] + final_tags[each["table_name"]] = each["owner_name"] return final_tags def _get_ros_count(self, connection, projection_name, schema=None, **kw): if schema is not None: schema_condition = "lower(projection_schema) = '%(schema)s'" % { - 'schema': schema.lower()} + "schema": schema.lower() + } else: schema_condition = "1" - src = sql.text(dedent(""" + src = sql.text( + dedent( + """ SELECT ros_count FROM v_monitor.projection_storage WHERE lower(projection_name) = '%(table)s' - """ % {'table': projection_name.lower(), 'schema_condition': schema_condition})) + """ + % { + "table": projection_name.lower(), + "schema_condition": schema_condition, + } + ) + ) for data in connection.execute(src): - ros_count = data['ros_count'] + ros_count = data["ros_count"] return ros_count def _get_segmented(self, connection, projection_name, schema=None, **kw): if schema is not None: schema_condition = "lower(projection_schema) = '%(schema)s'" % { - 'schema': schema.lower()} + "schema": schema.lower() + } else: schema_condition = "1" - sig = sql.text(dedent(""" + sig = sql.text( + dedent( + """ SELECT is_segmented FROM v_catalog.projections WHERE lower(projection_name) = '%(table)s' - """ % {'table': projection_name.lower(), 'schema_condition': schema_condition})) + """ + % { + "table": projection_name.lower(), + "schema_condition": schema_condition, + } + ) + ) - ssk = sql.text(dedent(""" + ssk = sql.text( + dedent( + """ SELECT segment_expression FROM v_catalog.projections WHERE lower(projection_name) = '%(table)s' - """ % {'table': projection_name.lower(), 'schema_condition': schema_condition})) + """ + % { + "table": projection_name.lower(), + "schema_condition": schema_condition, + } + ) + ) is_segmented = "" segmentation_key = "" for data in connection.execute(sig): - is_segmented = str(data['is_segmented']) + is_segmented = str(data["is_segmented"]) if is_segmented: for data in connection.execute(ssk): segmentation_key = str(data) @@ -1048,41 +1256,63 @@ def _get_segmented(self, connection, projection_name, schema=None, **kw): def _get_partitionkey(self, connection, projection_name, schema=None, **kw): if schema is not None: schema_condition = "lower(projection_schema) = '%(schema)s'" % { - 'schema': schema.lower()} + "schema": schema.lower() + } else: schema_condition = "1" partition_key = "" - spk = sql.text(dedent(""" + spk = sql.text( + dedent( + """ SELECT partition_key FROM v_monitor.partitions WHERE lower(projection_name) = '%(table)s' LIMIT 1 - """ % {'table': projection_name.lower(), 'schema_condition': schema_condition})) + """ + % { + "table": projection_name.lower(), + "schema_condition": schema_condition, + } + ) + ) for data in connection.execute(spk): - partition_key = data['partition_key'] + partition_key = data["partition_key"] return partition_key def _get_projectiontype(self, connection, projection_name, schema=None, **kw): if schema is not None: schema_condition = "lower(projection_schema) = '%(schema)s'" % { - 'schema': schema.lower()} + "schema": schema.lower() + } else: schema_condition = "1" projection_type = [] - spt = sql.text(dedent(""" + spt = sql.text( + dedent( + """ SELECT is_super_projection,is_key_constraint_projection,is_aggregate_projection,has_expressions FROM v_catalog.projections WHERE lower(projection_name) = '%(table)s' AND %(schema_condition)s - """ % {'table': projection_name.lower(), 'schema_condition': schema_condition})) + """ + % { + "table": projection_name.lower(), + "schema_condition": schema_condition, + } + ) + ) for data in connection.execute(spt): - lst = ["is_super_projection", "is_key_constraint_projection", - "is_aggregate_projection", "has_expressions"] + lst = [ + "is_super_projection", + "is_key_constraint_projection", + "is_aggregate_projection", + "has_expressions", + ] i = 0 for d in range(len(data)): @@ -1095,17 +1325,26 @@ def _get_projectiontype(self, connection, projection_name, schema=None, **kw): def _get_numpartitions(self, connection, projection_name, schema=None, **kw): if schema is not None: schema_condition = "lower(projection_schema) = '%(schema)s'" % { - 'schema': schema.lower()} + "schema": schema.lower() + } else: schema_condition = "1" partition_number = "" - snp = sql.text(dedent(""" + snp = sql.text( + dedent( + """ SELECT Count(ros_id) as np FROM v_monitor.partitions WHERE lower(projection_name) = '%(table)s' - """ % {'table': projection_name.lower(), 'schema_condition': schema_condition})) + """ + % { + "table": projection_name.lower(), + "schema_condition": schema_condition, + } + ) + ) for data in connection.execute(snp): partition_number = data.np @@ -1115,34 +1354,52 @@ def _get_numpartitions(self, connection, projection_name, schema=None, **kw): def _get_projectionsize(self, connection, projection_name, schema=None, **kw): if schema is not None: schema_condition = "lower(projection_schema) = '%(schema)s'" % { - 'schema': schema.lower()} + "schema": schema.lower() + } else: schema_condition = "1" - sps = sql.text(dedent(""" + sps = sql.text( + dedent( + """ SELECT ROUND(used_bytes // 1024) AS used_bytes from v_monitor.projection_storage WHERE lower(projection_name) = '%(table)s' - """ % {'table': projection_name.lower(), 'schema_condition': schema_condition})) + """ + % { + "table": projection_name.lower(), + "schema_condition": schema_condition, + } + ) + ) projection_size = "" for data in connection.execute(sps): - projection_size = data['used_bytes'] + projection_size = data["used_bytes"] return projection_size def _get_ifcachedproj(self, connection, projection_name, schema=None, **kw): if schema is not None: schema_condition = "lower(projection_schema) = '%(schema)s'" % { - 'schema': schema.lower()} + "schema": schema.lower() + } else: schema_condition = "1" - depot_pin_policy = sql.text(dedent(""" + depot_pin_policy = sql.text( + dedent( + """ SELECT COUNT(*) FROM DEPOT_PIN_POLICIES WHERE lower(object_name) = '%(table)s' - """ % {'table': projection_name.lower(), 'schema_condition': schema_condition})) + """ + % { + "table": projection_name.lower(), + "schema_condition": schema_condition, + } + ) + ) cached_projection = "" @@ -1153,191 +1410,294 @@ def _get_ifcachedproj(self, connection, projection_name, schema=None, **kw): cached_projection = False return cached_projection - @reflection.cache - def get_projection_comment(self, connection, schema=None, **kw): - - src = sql.text(dedent(""" + @lru_cache(maxsize=None) + def fetch_projection_comments(self,connection,schema): + src = sql.text( + dedent( + """ SELECT ros_count , LOWER(projection_name) FROM v_monitor.projection_storage WHERE projection_schema = '%(schema)s' - """ % {'schema': schema})) - - projection_type =sql.text(dedent(""" + """ + % {"schema": schema} + ) + ) + + projection_type = sql.text( + dedent( + """ SELECT DISTINCT is_super_projection,is_key_constraint_projection,is_aggregate_projection,has_expressions ,LOWER(projection_name) FROM v_catalog.projections WHERE projection_schema = '%(schema)s' - """ % {'schema': schema})) - - - is_segmented = sql.text(dedent(""" + """ + % {"schema": schema} + ) + ) + + is_segmented = sql.text( + dedent( + """ SELECT is_segmented , segment_expression , LOWER(projection_name) FROM v_catalog.projections WHERE projection_schema = '%(schema)s' - """ % {'schema': schema})) - - - partition_key = sql.text(dedent(""" + """ + % {"schema": schema} + ) + ) + + partition_key = sql.text( + dedent( + """ SELECT DISTINCT LOWER(projection_name) , partition_key FROM v_monitor.partitions WHERE table_schema = '%(schema)s' - """ % {'schema': schema})) - - partition_num = sql.text(dedent(""" + """ + % {"schema": schema} + ) + ) + + partition_num = sql.text( + dedent( + """ SELECT COUNT(ros_id) as Partition_Size , LOWER(projection_name) FROM v_monitor.partitions WHERE table_schema = '%(schema)s' GROUP BY projection_name - """ % {'schema': schema})) - - projection_size = sql.text(dedent(""" + """ + % {"schema": schema} + ) + ) + + projection_size = sql.text( + dedent( + """ SELECT used_bytes , LOWER(projection_name) from v_monitor.projection_storage WHERE projection_schema = '%(schema)s' - """ % {'schema': schema})) - - projection_cache = sql.text(dedent(""" + """ + % {"schema": schema} + ) + ) + + projection_cache = sql.text( + dedent( + """ SELECT COUNT(*) , object_name FROM DEPOT_PIN_POLICIES WHERE schema_name = '%(schema)s' GROUP BY object_name - """ % {'schema': schema})) - + """ + % {"schema": schema} + ) + ) + projection_comment = [] - + ros_count = {} for data in connection.execute(src): # ros_count = data['ros_count'] - ros_count = {"ROS_Count": data[0] , "projection_name" : data[1]} + ros_count = {"ROS_Count": data[0], "projection_name": data[1]} projection_comment.append(ros_count) - - - - lst = ["is_super_projection", "is_key_constraint_projection", - "is_aggregate_projection", "has_expressions"] - - for ptype in connection.execute(projection_type): - + + lst = [ + "is_super_projection", + "is_key_constraint_projection", + "is_aggregate_projection", + "has_expressions", + ] + + + for ptype in connection.execute(projection_type): for i, value in enumerate(ptype): if value is True: for a in projection_comment: - if a['projection_name'] == ptype[4]: + if a["projection_name"] == ptype[4]: if "Projection_Type" in a: - a["Projection_Type"] = a["Projection_Type"] + ", "+str(lst[i]) + a["Projection_Type"] = ( + a["Projection_Type"] + ", " + str(lst[i]) + ) else: a["Projection_Type"] = str(lst[i]) - - for projection_segment in connection.execute(is_segmented): + for projection_segment in connection.execute(is_segmented): for a in projection_comment: - if a['projection_name'] == projection_segment[2]: + if a["projection_name"] == projection_segment[2]: a["is_segmented"] = str(projection_segment[0]) a["Segmentation_key"] = str(projection_segment[1]) - + for partion_keys in connection.execute(partition_key): - for a in projection_comment: - if a['projection_name'] == partion_keys[0]: + if a["projection_name"] == partion_keys[0]: a["Partition_Key"] = str(partion_keys[1]) - - - projection_size_dict={} + projection_size_dict = {} for projection_sizes in connection.execute(projection_size): if projection_sizes[1] not in projection_size_dict: projection_size_dict[projection_sizes[1]] = projection_sizes[0] / 1024 else: projection_size_dict[projection_sizes[1]] += projection_sizes[0] / 1024 - - + for a in projection_comment: - if a['projection_name'] in projection_size_dict: - a['projection_size'] = str(int(projection_size_dict[a['projection_name']])) + " KB" - + if a["projection_name"] in projection_size_dict: + a["projection_size"] = ( + str(int(projection_size_dict[a["projection_name"]])) + " KB" + ) + else: - a['projection_size'] = "0 KB" - - + a["projection_size"] = "0 KB" + for partition_number in connection.execute(partition_num): - - for a in projection_comment: - if a['projection_name'].lower() == partition_number[1]: + if a["projection_name"].lower() == partition_number[1]: + a["Partition_Size"] = str(partition_number["Partition_Size"]) - a["Partition_Size"] = str(partition_number['Partition_Size']) - for projection_cached in connection.execute(projection_cache): - - for a in projection_comment: - if projection_cached[0] > 0: a["Projection_Cached"] = True else: a["Projection_Cached"] = False - - return {"text": "Vertica physically stores table data in projections, \ + + return projection_comment + + + def get_projection_comment(self, connection, projection, schema=None, **kw): + try: + + comments = self.fetch_projection_comments(connection,schema) + projection_comments = [prop for prop in comments if prop["projection_name"].lower() == projection.lower()] + + projection_properties={} + + if 'ROS_Count' in projection_comments[0]: + projection_properties["ROS_Count"] = str(projection_comments[0]['ROS_Count']) + else: + projection_properties["ROS_Count"] = "Not Available" + + if 'Projection_Type' in projection_comments[0]: + projection_properties['Projection_Type']= str(projection_comments[0]['Projection_Type']) + else : + projection_properties['Projection_Type']= "Not Available" + + if 'is_segmented' in projection_comments[0]: + projection_properties['is_segmented'] = str(projection_comments[0]['is_segmented']) + else: + projection_properties['is_segmented'] = "Not Available" + + if 'Segmentation_key' in projection_comments[0]: + projection_properties['Segmentation_key'] = str(projection_comments[0]['Segmentation_key']) + else: + projection_properties['Segmentation_key'] = "Not Available" + + if 'projection_size' in projection_comments[0]: + projection_properties['projection_size'] = str(projection_comments[0]['projection_size']) + else: + projection_properties['projection_size'] = "0 KB" + + if 'Partition_Key' in projection_comments[0]: + projection_properties['Partition_Key'] = str(projection_comments[0]['Partition_Key']) + else: + projection_properties['Partition_Key'] = "Not Available" + + if 'Partition_Size' in projection_comments[0]: + projection_properties['Partition_Size'] = str(projection_comments[0]['Partition_Size']) + else: + projection_properties['Partition_Size'] = "0" + + if 'Projection_Cached' in projection_comments[0]: + projection_properties['Projection_Cached'] = str(projection_comments[0]['Projection_Cached']) + else: + projection_properties['Projection_Cached'] = "False" + + + except Exception as e: + print(e) + + + return { + "text": "Vertica physically stores table data in projections, \ which are collections of table columns. Projections store data in a format that optimizes query execution \ For more info on projections and corresponding properties check out the Vertica Docs: https://www.vertica.com/docs", - "properties": projection_comment } + "properties": projection_properties, + } @reflection.cache def get_model_comment(self, connection, model_name, schema=None, **kw): - if schema is not None: schema_condition = "lower(schema_name) = '%(schema)s'" % { - 'schema': schema.lower()} + "schema": schema.lower() + } else: schema_condition = "1" - model_used_by = sql.text(dedent(""" + model_used_by = sql.text( + dedent( + """ select owner_name from models where model_name = '%(model)s' - """ % {'model': model_name})) + """ + % {"model": model_name} + ) + ) - model_attr_name = sql.text(dedent(""" + model_attr_name = sql.text( + dedent( + """ SELECT GET_MODEL_ATTRIBUTE ( USING PARAMETERS model_name='%(schema)s.%(model)s'); - """ % {'model': model_name, 'schema': schema.lower()})) + """ + % {"model": model_name, "schema": schema.lower()} + ) + ) used_by = "" attr_name = [] attr_details = [] for data in connection.execute(model_used_by): - used_by = data['owner_name'] + used_by = data["owner_name"] for data in connection.execute(model_attr_name): - attributes = { - "attr_name": data[0], "attr_fields": data[1], "#_of_rows": data[2]} + "attr_name": data[0], + "attr_fields": data[1], + "#_of_rows": data[2], + } attr_name.append(attributes) attributes_details = [] for data in attr_name: attr_details_dict = dict() - attr_names = data['attr_name'] - attr_fields = str(data['attr_fields']).split(',') + attr_names = data["attr_name"] + attr_fields = str(data["attr_fields"]).split(",") - get_attr_details = sql.text(dedent(""" + get_attr_details = sql.text( + dedent( + """ SELECT GET_MODEL_ATTRIBUTE ( USING PARAMETERS model_name='%(schema)s.%(model)s', attr_name='%(attr_name)s'); - """ % {'model': model_name, 'schema': schema.lower(), 'attr_name': attr_names})) + """ + % { + "model": model_name, + "schema": schema.lower(), + "attr_name": attr_names, + } + ) + ) value_final = dict() attr_details_dict = {"attr_name": attr_names} for data in connection.execute(get_attr_details): - if len(attr_fields) > 1: - for index, each in enumerate(attr_fields): if each not in value_final: value_final[each] = list() @@ -1351,16 +1711,22 @@ def get_model_comment(self, connection, model_name, schema=None, **kw): attr_details_dict.update(value_final) attributes_details.append(attr_details_dict) - return {"text": "Vertica provides a number of machine learning functions for performing in-database analysis. \ + return { + "text": "Vertica provides a number of machine learning functions for performing in-database analysis. \ These functions perform data preparation, model training, and predictive tasks. \ These properties shows the Model attributes and Specifications in the current schema.", - "properties": {"used_by": str(used_by), - "Model Attributes": str(attr_name), "Model Specifications": str(attributes_details)}} + "properties": { + "used_by": str(used_by), + "Model Attributes": str(attr_name), + "Model Specifications": str(attributes_details), + }, + } @reflection.cache def get_oauth_comment(self, connection, oauth, schema=None, **kw): - - get_oauth_comments = sql.text(dedent(""" + get_oauth_comments = sql.text( + dedent( + """ SELECT auth_oid , is_auth_enabled, is_fallthrough_enabled, @@ -1370,12 +1736,13 @@ def get_oauth_comment(self, connection, oauth, schema=None, **kw): from v_catalog.client_auth WHERE auth_method = 'OAUTH' - """)) + """ + ) + ) client_id = "" client_secret = "" for data in connection.execute(get_oauth_comments): - - whole_data = str(data['auth_parameters']).split(", ") + whole_data = str(data["auth_parameters"]).split(", ") client_id_data = whole_data[0].split("=") if client_id_data: # client_data.update({client_id_data[0] : client_id_data[1]}) @@ -1396,274 +1763,270 @@ def get_oauth_comment(self, connection, oauth, schema=None, **kw): # client_data.update({client_secret_data[0] : client_secret_data[1]}) introspect_url = client_introspect_url[1] - auth_oid = data['auth_oid'] - is_auth_enabled = data['is_auth_enabled'] - auth_priority = data['auth_priority'] - address_priority = data['address_priority'] - is_fallthrough_enabled = data['is_fallthrough_enabled'] + auth_oid = data["auth_oid"] + is_auth_enabled = data["is_auth_enabled"] + auth_priority = data["auth_priority"] + address_priority = data["address_priority"] + is_fallthrough_enabled = data["is_fallthrough_enabled"] - return {"text": "Vertica supports OAUTH based authentication. \ + return { + "text": "Vertica supports OAUTH based authentication. \ These properties are only visible in Datahub if you have access to the authorization table in Vertica. \ - All the properties shown here are what Vertica uses for a client connecting via OAUTH.", "properties": {"discovery_url": str(discovery_url), - "client_id": str(client_id), "introspect_url": str(introspect_url), "auth_oid ": str(auth_oid), "client_secret": str(client_secret), - "is_auth_enabled": str(is_auth_enabled), "auth_priority": str(auth_priority), "address_priority": str(address_priority), "is_fallthrough_enabled": str(is_fallthrough_enabled), }} - -########################################################## new code ############################################################ - - def get_table_owner(self, connection, schema=None, **kw): - if schema is not None: - schema_condition = "lower(table_schema) = '%(schema)s'" % { - 'schema': schema.lower()} - else: - schema_condition = "1" - - sct = sql.text(dedent(""" - SELECT table_name ,owner_name - FROM v_catalog.tables - where lower(table_schema) = '%(schema)s' - """ % {'schema': schema.lower()})) - - owner_info = [] - for row in connection.execute(sct): - owner_info.append(row) - # print(row) - - - return owner_info + All the properties shown here are what Vertica uses for a client connecting via OAUTH.", + "properties": { + "discovery_url": str(discovery_url), + "client_id": str(client_id), + "introspect_url": str(introspect_url), + "auth_oid ": str(auth_oid), + "client_secret": str(client_secret), + "is_auth_enabled": str(is_auth_enabled), + "auth_priority": str(auth_priority), + "address_priority": str(address_priority), + "is_fallthrough_enabled": str(is_fallthrough_enabled), + }, + } + + def get_all_columns(self, connection, table, schema=None, **kw): + columns = self.fetch_table_columns(connection, schema) + table_columns = [ + prop for prop in columns if prop["table_name"].lower() == table.lower() + ] + return table_columns @reflection.cache - def get_all_columns(self, connection,schema=None, **kw): + def get_columns(self, connection, table_name, schema=None, **kw): if schema is not None: schema_condition = "lower(table_schema) = '%(schema)s'" % { 'schema': schema.lower()} else: schema_condition = "1" - + s = sql.text(dedent(""" - SELECT column_name, data_type, column_default, is_nullable,table_name + SELECT column_name, data_type, column_default, is_nullable FROM v_catalog.columns - where lower(table_schema) = '%(schema)s' - - """ % {'schema': schema.lower()})) + WHERE lower(table_name) = '%(table)s' + AND %(schema_condition)s + UNION ALL + SELECT column_name, data_type, '' as column_default, true as is_nullable + FROM v_catalog.view_columns + WHERE lower(table_name) = '%(table)s' + AND %(schema_condition)s + UNION ALL + SELECT projection_column_name,data_type,'' as column_default, true as is_nullable + FROM PROJECTION_COLUMNS + WHERE lower(projection_name) = '%(table)s' + AND %(schema_condition)s + """ % {'table': table_name.lower(), 'schema_condition': schema_condition})) + spk = sql.text(dedent(""" + SELECT column_name + FROM v_catalog.primary_keys + WHERE lower(table_name) = '%(table)s' + AND constraint_type = 'p' + AND %(schema_condition)s + """ % {'table': table_name.lower(), 'schema_condition': schema_condition})) + + pk_columns = [x[0] for x in connection.execute(spk)] columns = [] - for row in connection.execute(s): name = row.column_name dtype = row.data_type.lower() + primary_key = name in pk_columns default = row.column_default nullable = row.is_nullable - tablename = row.table_name.lower() - column_info = self._get_all_column_info( + column_info = self._get_column_info( name, dtype, default, nullable, + table_name, schema, - tablename ) - # print(column_info) - # column_info.update({'primary_key': primary_key}) + column_info.update({'primary_key': primary_key}) columns.append(column_info) - return columns - @reflection.cache - def _get_all_column_info( # noqa: C901 - self, name, data_type, default, is_nullable, schema=None ,table_name=None - ): - attype: str = re.sub(r"\(.*\)", "", data_type) + ########################################################## new code ############################################################ - charlen = re.search(r"\(([\d,]+)\)", data_type) - if charlen: - charlen = charlen.group(1) # type: ignore - args = re.search(r"\((.*)\)", data_type) - if args and args.group(1): - args = tuple(re.split(r"\s*,\s*", args.group(1))) # type: ignore + @lru_cache(maxsize=None) + def fetch_table_owner(self, connection, schema): + if schema is not None: + schema_condition = "lower(table_schema) = '%(schema)s'" % { + "schema": schema.lower() + } else: - args = () # type: ignore - kwargs: Dict[str, Any] = {} + schema_condition = "1" - if attype == "numeric": - if charlen: - prec, scale = charlen.split(",") # type: ignore - args = (int(prec), int(scale)) # type: ignore - else: - args = () # type: ignore - elif attype == "integer": - args = () # type: ignore - elif attype in ("timestamptz", "timetz"): - kwargs["timezone"] = True - # if charlen: - # kwargs["precision"] = int(charlen) # type: ignore - args = () # type: ignore - elif attype in ("timestamp", "time"): - kwargs["timezone"] = False - # if charlen: - # kwargs["precision"] = int(charlen) # type: ignore - args = () # type: ignore - elif attype.startswith("interval"): - field_match = re.match(r"interval (.+)", attype, re.I) - # if charlen: - # kwargs["precision"] = int(charlen) # type: ignore - if field_match: - kwargs["fields"] = field_match.group(1) # type: ignore - attype = "interval" - args = () # type: ignore - elif attype == "date": - args = () # type: ignore - elif charlen: - args = (int(charlen),) # type: ignore + sct = sql.text( + dedent( + """ + SELECT table_name ,owner_name + FROM v_catalog.tables + where lower(table_schema) = '%(schema)s' + """ + % {"schema": schema.lower()} + ) + ) - while True: - if attype.upper() in self.ischema_names: - coltype = self.ischema_names[attype.upper()] - break - else: - coltype = None - break + owner_info = [] + for row in connection.execute(sct): + owner_info.append( + {"table_name": row.table_name, "owner_name": row.owner_name} + ) - self.ischema_names["UUID"] = UUID - self.ischema_names["TIMESTAMP"] = TIMESTAMP_WITH_PRECISION - self.ischema_names["TIMESTAMPTZ"] = TIMESTAMP_WITH_TIMEZONE - self.ischema_names["TIMETZ"] = TIME_WITH_TIMEZONE + # owner_info = row["owner_name"] - if coltype: - coltype = coltype(*args, **kwargs) - else: - util.warn("Did not recognize type '%s' of column '%s'" % - (attype, name)) - coltype = sqltypes.NULLTYPE - # adjust the default value - autoincrement = False - if default is not None: - match = re.search(r"""(nextval\(')([^']+)('.*$)""", default) - if match is not None: - if issubclass(coltype._type_affinity, sqltypes.Integer): - autoincrement = True - # the default is related to a Sequence - sch = schema - if "." not in match.group(2) and sch is not None: - # unconditionally quote the schema name. this could - # later be enhanced to obey quoting rules / - # "quote schema" - default = ( - match.group(1) - + ('"%s"' % sch) - + "." - + match.group(2) - + match.group(3) - ) + return owner_info - column_info = dict( - name=name, - type=coltype, - nullable=is_nullable, - default=default, - autoincrement=autoincrement, - comment=str(default), - tablename = str(table_name) - ) + def get_table_owner(self, connection, table, schema=None, **kw): + owner = self.fetch_table_owner(connection, schema) - return column_info - - @reflection.cache - def get_all_view_columns(self, connection, schema=None, **kw): - - s = sql.text(dedent(""" + + owner_info = [ + prop for prop in owner if prop["table_name"].lower() == table.lower() + ] + table_owner = owner_info[0]['owner_name'] + + return table_owner + + + @lru_cache(maxsize=None) + def fetch_view_columns(self, connection, schema): + s = sql.text( + dedent( + """ SELECT column_name, data_type, '' as column_default, true as is_nullable,lower(table_name) as table_name FROM v_catalog.view_columns where lower(table_schema) = '%(schema)s' - """ % {'schema': schema.lower()})) + """ + % {"schema": schema.lower()} + ) + ) columns = [] - + for row in connection.execute(s): name = row.column_name dtype = row.data_type.lower() default = row.column_default nullable = row.is_nullable - tablename = row.table_name.lower() + table_name = row.table_name.lower() - column_info = self._get_all_column_info( - name, - dtype, - default, - nullable, - schema, - tablename + column_info = self._get_column_info( + name, dtype, default, nullable,table_name, schema ) # print(column_info) # column_info.update({'primary_key': primary_key}) columns.append(column_info) - - - + return columns - def get_view_comment(self, connection,schema=None, **kw): + + + def get_view_columns(self, connection, view, schema=None, **kw): + + columns = self.fetch_view_columns(connection, schema) + table_columns = [ + prop for prop in columns if prop["table_name"].lower() == view.lower() + ] + return table_columns + + @lru_cache(maxsize=None) + def fetch_view_comment(self, connection, schema): if schema is not None: schema_condition = "lower(table_schema) = '%(schema)s'" % { - 'schema': schema.lower()} + "schema": schema.lower() + } else: schema_condition = "1" - - sct = sql.text(dedent(""" + sct = sql.text( + dedent( + """ SELECT create_time , table_name FROM v_catalog.views where lower(table_schema) = '%(schema)s' - """ % {'schema': schema.lower()})) + """ + % {"schema": schema.lower()} + ) + ) - columns = [] + comments = [] for row in connection.execute(sct): - columns.append({'create_time': str(row[0]), 'table_name': row[1]}) + comments.append({"create_time": str(row[0]), "table_name": row[1]}) + + return comments + + + def get_view_comment(self, connection, view, schema=None, **kw): + + comments = self.fetch_view_comment(connection, schema ) + + view_comments = [prop for prop in comments if prop["table_name"].lower() == view.lower()] + + view_properties = { + "create_time": view_comments[0]['create_time'], + } - return {"text": "References the properties of a native table in Vertica. \ + return { + "text": "References the properties of a native table in Vertica. \ Vertica physically stores table data in projections, which are collections of table columns. \ Projections store data in a format that optimizes query execution. \ In order to query or perform any operation on a Vertica table, the table must have one or more projections associated with it. ", - "properties": columns } - - @reflection.cache - def get_view_owner(self, connection, schema=None, **kw): + "properties": view_properties, + } + + + @lru_cache(maxsize=None) + def fetch_view_owner(self, connection, schema): if schema is not None: schema_condition = "lower(table_schema) = '%(schema)s'" % { - 'schema': schema.lower()} + "schema": schema.lower() + } else: schema_condition = "1" - sct = sql.text(dedent(""" + sct = sql.text( + dedent( + """ SELECT table_name ,owner_name FROM v_catalog.views where lower(table_schema) = '%(schema)s' - """ % {'schema': schema.lower()})) + """ + % {"schema": schema.lower()} + ) + ) - owner_info = [] for row in connection.execute(sct): - owner_info.append(row) - # print(row) + owner_info.append( + {"table_name": row.table_name, "owner_name": row.owner_name} + ) + + # owner_info = row["owner_name"] - return owner_info - - def _populate_view_lineage(self, connection, schema: str) -> None: - """Collects upstream and downstream lineage information for views . - - Args: - view (str): name of the view - - """ + def get_view_owner(self, connection, view, schema=None, **kw): + + owner = self.fetch_view_owner(connection, schema) + + + owner_info = [ + prop for prop in owner if prop["table_name"].lower() == view.lower() + ] + view_owner = owner_info[0]['owner_name'] - self.view_lineage_map: Optional[Dict[str, List[Tuple[str, str, str]]]] = None - + return view_owner + + @lru_cache(maxsize=None) + def fetch_view_lineage(self, connection,schema) -> None: + view_upstream_lineage_query = sql.text( dedent( @@ -1676,57 +2039,83 @@ def _populate_view_lineage(self, connection, schema: str) -> None: refrence_table = [] for data in connection.execute(view_upstream_lineage_query): # refrence_table.append(data) - refrence_table.append({"reference_table_name" : data["reference_table_name"] , "reference_table_schema":data["reference_table_schema"] , - "view_name":data["table_name"], "table_schema":data["table_schema"]} ) + refrence_table.append( + { + "reference_table_name": data["reference_table_name"], + "reference_table_schema": data["reference_table_schema"], + "view_name": data["table_name"], + "table_schema": data["table_schema"], + } + ) # refrence_table = data["reference_table_name"] - num_edges: int = 0 + return refrence_table + + + + + def _populate_view_lineage(self, connection, view, schema: str) -> None: + """Collects upstream and downstream lineage information for views . + + Args: + view (str): name of the view + + """ + + + view_lineage_map: Optional[Dict[str, List[Tuple[str, str, str]]]] = None + + refrence_table = self.fetch_view_lineage(connection, schema) + try: - self.view_lineage_map = defaultdict(list) + view_lineage_map = defaultdict(list) for lineage in refrence_table: - downstream = f"{lineage['table_schema']}.{lineage['view_name']}" - + downstream = f"{lineage['table_schema']}.{lineage['view_name']}" + upstream = f"{lineage['reference_table_schema']}.{lineage['reference_table_name']}" view_upstream: str = upstream view_name: str = downstream - self.view_lineage_map[view_name].append( + + view_lineage_map[view_name].append( # (, , ) (view_upstream, "[]", "[]") ) + - - num_edges += 1 - - return self.view_lineage_map + return view_lineage_map except Exception as e: - self.warn( - logger, - "view_upstream_lineage", - "Extracting the upstream & Downstream view lineage from vertica failed." - + f"Please check your permissions. Continuing...\nError was {e}.", - ) + import traceback + print(traceback.format_exc()) + + # logger.info( + # "view_upstream_lineage", + # "Extracting the upstream & Downstream view lineage from vertica failed." + # + f"Please check your permissions. Continuing...\nError was {e}.", + + # ) - logger.info( - f"A total of {num_edges} View upstream edges found found for {schema}" - ) - @reflection.cache - def get_all_projection_columns(self, connection, schema=None, **kw): - - s = sql.text(dedent(""" + + @lru_cache(maxsize = None) + def fetch_projection_columns(self,connection, schema): + s = sql.text( + dedent( + """ SELECT projection_column_name, data_type, '' as column_default, true as is_nullable,lower(projection_name) as projection_name FROM PROJECTION_COLUMNS where lower(table_schema) = '%(schema)s' - """ % {'schema': schema.lower()})) + """ + % {"schema": schema.lower()} + ) + ) - columns = [] - + for row in connection.execute(s): name = row.projection_column_name dtype = row.data_type.lower() @@ -1734,48 +2123,59 @@ def get_all_projection_columns(self, connection, schema=None, **kw): nullable = row.is_nullable tablename = row.projection_name.lower() - column_info = self._get_all_column_info( - name, - dtype, - default, - nullable, - schema, - tablename + column_info = self._get_column_info( + name, dtype, default, nullable, tablename, schema, ) # print(column_info) # column_info.update({'primary_key': primary_key}) columns.append(column_info) - + return columns - - def get_projection_owner(self, connection,schema=None, **kw): - projection_owner_command = sql.text(dedent(""" + def get_projection_columns(self, connection,projection, schema=None, **kw): + + columns = self.fetch_projection_columns(connection,schema) + + projection_columns = [ + prop for prop in columns if prop["table_name"].lower() == projection.lower() + ] + + return projection_columns + + @lru_cache(maxsize=None) + def fetch_projection_owner(self,connection,schema): + projection_owner_command = sql.text( + dedent( + """ SELECT projection_name as table_name, owner_name FROM v_catalog.projections WHERE lower(projection_schema) = '%(schema)s' - """ % {'schema': schema.lower()})) - + """ + % {"schema": schema.lower()} + ) + ) + owner_info = [] for row in connection.execute(projection_owner_command): owner_info.append(row) # print(row) - return owner_info - - def _populate_projection_lineage(self, connection, schema: str) -> None: - """Collects upstream and downstream lineage information for views . - - Args: - view (str): name of the view - - """ - self.projection_lineage_map: Optional[Dict[str, List[Tuple[str, str, str]]]] = None - - + def get_projection_owner(self, connection, projection,schema=None, **kw): + owner = self.fetch_projection_owner(connection,schema) + projections_owner = [ + prop for prop in owner if prop[0].lower() == projection.lower() + ] + projection_owner = projections_owner[0][1] + return projection_owner + + @lru_cache(maxsize=None) + def fetch_populate_projection_lineage(self,connection,schema): + + refrence_table = [] + projection_upstream_lineage_query = sql.text( dedent( """ @@ -1783,36 +2183,50 @@ def _populate_projection_lineage(self, connection, schema: str) -> None: % {"schema": schema} ) ) - - refrence_table = [] for data in connection.execute(projection_upstream_lineage_query): # refrence_table.append(data) - refrence_table.append({"basename" : data["basename"] , "schemaname":data["schemaname"] , - "name":data["name"], } ) - # refrence_table = data["reference_table_name"] + refrence_table.append( + { + "basename": data["basename"], + "schemaname": data["schemaname"], + "name": data["name"], + } + ) + + return refrence_table + + + def _populate_projection_lineage(self, connection, projection, schema: str) -> None: + """Collects upstream and downstream lineage information for views . + + Args: + view (str): name of the view + + """ + + projection_lineage_map: Optional[ + Dict[str, List[Tuple[str, str, str]]] + ] = None - num_edges: int = 0 + refrence_table = self.fetch_populate_projection_lineage(connection ,schema) try: - self.projection_lineage_map = defaultdict(list) + projection_lineage_map = defaultdict(list) + # print(refrence_table) + # exit() for lineage in refrence_table: - downstream = f"{lineage['schemaname']}.{lineage['name']}" - - + upstream = f"{lineage['schemaname']}.{lineage['basename']}" view_upstream: str = upstream view_name: str = downstream - self.projection_lineage_map[view_name].append( + projection_lineage_map[view_name].append( # (, , ) (view_upstream, "[]", "[]") ) - - - num_edges += 1 - - return self.projection_lineage_map + + return projection_lineage_map except Exception as e: self.warn( @@ -1821,7 +2235,3 @@ def _populate_projection_lineage(self, connection, schema: str) -> None: "Extracting the upstream & Downstream view lineage from vertica failed." + f"Please check your permissions. Continuing...\nError was {e}.", ) - - logger.info( - f"A total of {num_edges} View upstream edges found found for {schema}" - ) \ No newline at end of file