From dd3d14964bc8a9ba5ce9e01d44f93ab07117756a Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Tue, 5 Dec 2023 17:42:29 +0100 Subject: [PATCH] fix(ingest/redshift): Identify materialized views properly + fix connection args support (#9368) --- .../docs/sources/redshift/redshift_recipe.yml | 4 +- metadata-ingestion/setup.py | 8 +- .../ingestion/source/redshift/config.py | 23 ++++- .../ingestion/source/redshift/query.py | 18 +++- .../ingestion/source/redshift/redshift.py | 6 +- .../source/redshift/redshift_schema.py | 98 +++++++++++++------ 6 files changed, 109 insertions(+), 48 deletions(-) diff --git a/metadata-ingestion/docs/sources/redshift/redshift_recipe.yml b/metadata-ingestion/docs/sources/redshift/redshift_recipe.yml index be704e6759d41..a561405d3de47 100644 --- a/metadata-ingestion/docs/sources/redshift/redshift_recipe.yml +++ b/metadata-ingestion/docs/sources/redshift/redshift_recipe.yml @@ -40,8 +40,8 @@ source: options: connect_args: - sslmode: "prefer" # or "require" or "verify-ca" - sslrootcert: ~ # needed to unpin the AWS Redshift certificate + # check all available options here: https://pypi.org/project/redshift-connector/ + ssl_insecure: "false" # Specifies if IDP hosts server certificate will be verified sink: # sink configs diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 4f5f09fb148fa..416b255fb763f 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -181,8 +181,8 @@ redshift_common = { # Clickhouse 0.8.3 adds support for SQLAlchemy 1.4.x "sqlalchemy-redshift>=0.8.3", - "psycopg2-binary", "GeoAlchemy2", + "redshift-connector", *sqllineage_lib, *path_spec_common, } @@ -365,11 +365,7 @@ | {"psycopg2-binary", "pymysql>=1.0.2"}, "pulsar": {"requests"}, "redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib, - "redshift": sql_common - | redshift_common - | usage_common - | {"redshift-connector"} - | sqlglot_lib, + "redshift": sql_common | redshift_common | usage_common | sqlglot_lib, "s3": {*s3_base, *data_lake_profiling}, "gcs": {*s3_base, *data_lake_profiling}, "sagemaker": aws_common, diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py index 95038ef2c6212..51ad8a050adc2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py @@ -82,7 +82,7 @@ class RedshiftConfig( # large Redshift warehouses. As an example, see this query for the columns: # https://github.com/sqlalchemy-redshift/sqlalchemy-redshift/blob/60b4db04c1d26071c291aeea52f1dcb5dd8b0eb0/sqlalchemy_redshift/dialect.py#L745. scheme: str = Field( - default="redshift+psycopg2", + default="redshift+redshift_connector", description="", hidden_from_schema=True, ) @@ -170,3 +170,24 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict: "The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`." ) return values + + @root_validator(skip_on_failure=True) + def connection_config_compatibility_set(cls, values: Dict) -> Dict: + if ( + ("options" in values and "connect_args" in values["options"]) + and "extra_client_options" in values + and len(values["extra_client_options"]) > 0 + ): + raise ValueError( + "Cannot set both `connect_args` and `extra_client_options` in the config. Please use `extra_client_options` only." + ) + + if "options" in values and "connect_args" in values["options"]: + values["extra_client_options"] = values["options"]["connect_args"] + + if values["extra_client_options"]: + if values["options"]: + values["options"]["connect_args"] = values["extra_client_options"] + else: + values["options"] = {"connect_args": values["extra_client_options"]} + return values diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py index a96171caf9835..92e36fffd6bb4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py @@ -179,14 +179,18 @@ class RedshiftQuery: additional_table_metadata: str = """ select - database, - schema, + ti.database, + ti.schema, "table", size, tbl_rows, estimated_visible_rows, skew_rows, - last_accessed + last_accessed, + case + when smi.name is not null then 1 + else 0 + end as is_materialized from pg_catalog.svv_table_info as ti left join ( @@ -198,8 +202,12 @@ class RedshiftQuery: group by tbl) as la on (la.tbl = ti.table_id) - ; - """ + left join stv_mv_info smi on + smi.db_name = ti.database + and smi.schema = ti.schema + and smi.name = ti.table + ; +""" @staticmethod def stl_scan_based_lineage_query( diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 04f0edf504595..eb635b1292b81 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -6,7 +6,6 @@ import humanfriendly # These imports verify that the dependencies are available. -import psycopg2 # noqa: F401 import pydantic import redshift_connector @@ -352,7 +351,6 @@ def create(cls, config_dict, ctx): def get_redshift_connection( config: RedshiftConfig, ) -> redshift_connector.Connection: - client_options = config.extra_client_options host, port = config.host_port.split(":") conn = redshift_connector.connect( host=host, @@ -360,7 +358,7 @@ def get_redshift_connection( user=config.username, database=config.database, password=config.password.get_secret_value() if config.password else None, - **client_options, + **config.extra_client_options, ) conn.autocommit = True @@ -641,7 +639,7 @@ def gen_view_dataset_workunits( dataset_urn = self.gen_dataset_urn(datahub_dataset_name) if view.ddl: view_properties_aspect = ViewProperties( - materialized=view.type == "VIEW_MATERIALIZED", + materialized=view.materialized, viewLanguage="SQL", viewLogic=view.ddl, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py index 4a13d17d2cc0f..ca81682ae00e4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py @@ -40,6 +40,7 @@ class RedshiftTable(BaseTable): @dataclass class RedshiftView(BaseTable): type: Optional[str] = None + materialized: bool = False columns: List[RedshiftColumn] = field(default_factory=list) last_altered: Optional[datetime] = None size_in_bytes: Optional[int] = None @@ -66,6 +67,7 @@ class RedshiftExtraTableMeta: estimated_visible_rows: Optional[int] = None skew_rows: Optional[float] = None last_accessed: Optional[datetime] = None + is_materialized: bool = False @dataclass @@ -148,6 +150,7 @@ def enrich_tables( ], skew_rows=meta[field_names.index("skew_rows")], last_accessed=meta[field_names.index("last_accessed")], + is_materialized=meta[field_names.index("is_materialized")], ) if table_meta.schema not in table_enrich: table_enrich.setdefault(table_meta.schema, {}) @@ -173,42 +176,23 @@ def get_tables_and_views( logger.info(f"Fetched {len(db_tables)} tables/views from Redshift") for table in db_tables: schema = table[field_names.index("schema")] + table_name = table[field_names.index("relname")] + if table[field_names.index("tabletype")] not in [ "MATERIALIZED VIEW", "VIEW", ]: if schema not in tables: tables.setdefault(schema, []) - table_name = table[field_names.index("relname")] - - creation_time: Optional[datetime] = None - if table[field_names.index("creation_time")]: - creation_time = table[field_names.index("creation_time")].replace( - tzinfo=timezone.utc - ) - last_altered: Optional[datetime] = None - size_in_bytes: Optional[int] = None - rows_count: Optional[int] = None - if schema in enriched_table and table_name in enriched_table[schema]: - if enriched_table[schema][table_name].last_accessed: - # Mypy seems to be not clever enough to understand the above check - last_accessed = enriched_table[schema][table_name].last_accessed - assert last_accessed - last_altered = last_accessed.replace(tzinfo=timezone.utc) - elif creation_time: - last_altered = creation_time - - if enriched_table[schema][table_name].size: - # Mypy seems to be not clever enough to understand the above check - size = enriched_table[schema][table_name].size - if size: - size_in_bytes = size * 1024 * 1024 - - if enriched_table[schema][table_name].estimated_visible_rows: - rows = enriched_table[schema][table_name].estimated_visible_rows - assert rows - rows_count = int(rows) + ( + creation_time, + last_altered, + rows_count, + size_in_bytes, + ) = RedshiftDataDictionary.get_table_stats( + enriched_table, field_names, schema, table + ) tables[schema].append( RedshiftTable( @@ -231,16 +215,37 @@ def get_tables_and_views( else: if schema not in views: views[schema] = [] + ( + creation_time, + last_altered, + rows_count, + size_in_bytes, + ) = RedshiftDataDictionary.get_table_stats( + enriched_table=enriched_table, + field_names=field_names, + schema=schema, + table=table, + ) + + materialized = False + if schema in enriched_table and table_name in enriched_table[schema]: + if enriched_table[schema][table_name].is_materialized: + materialized = True views[schema].append( RedshiftView( type=table[field_names.index("tabletype")], name=table[field_names.index("relname")], ddl=table[field_names.index("view_definition")], - created=table[field_names.index("creation_time")], + created=creation_time, comment=table[field_names.index("table_description")], + last_altered=last_altered, + size_in_bytes=size_in_bytes, + rows_count=rows_count, + materialized=materialized, ) ) + for schema_key, schema_tables in tables.items(): logger.info( f"In schema: {schema_key} discovered {len(schema_tables)} tables" @@ -250,6 +255,39 @@ def get_tables_and_views( return tables, views + @staticmethod + def get_table_stats(enriched_table, field_names, schema, table): + table_name = table[field_names.index("relname")] + + creation_time: Optional[datetime] = None + if table[field_names.index("creation_time")]: + creation_time = table[field_names.index("creation_time")].replace( + tzinfo=timezone.utc + ) + last_altered: Optional[datetime] = None + size_in_bytes: Optional[int] = None + rows_count: Optional[int] = None + if schema in enriched_table and table_name in enriched_table[schema]: + if enriched_table[schema][table_name].last_accessed: + # Mypy seems to be not clever enough to understand the above check + last_accessed = enriched_table[schema][table_name].last_accessed + assert last_accessed + last_altered = last_accessed.replace(tzinfo=timezone.utc) + elif creation_time: + last_altered = creation_time + + if enriched_table[schema][table_name].size: + # Mypy seems to be not clever enough to understand the above check + size = enriched_table[schema][table_name].size + if size: + size_in_bytes = size * 1024 * 1024 + + if enriched_table[schema][table_name].estimated_visible_rows: + rows = enriched_table[schema][table_name].estimated_visible_rows + assert rows + rows_count = int(rows) + return creation_time, last_altered, rows_count, size_in_bytes + @staticmethod def get_schema_fields_for_column( column: RedshiftColumn,