Skip to content

Commit

Permalink
fix(ingest/redshift): Identify materialized views properly + fix conn…
Browse files Browse the repository at this point in the history
…ection args support (datahub-project#9368)
  • Loading branch information
treff7es authored and Salman-Apptware committed Dec 15, 2023
1 parent c5dd145 commit dd3d149
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 48 deletions.
4 changes: 2 additions & 2 deletions metadata-ingestion/docs/sources/redshift/redshift_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ source:

options:
connect_args:
sslmode: "prefer" # or "require" or "verify-ca"
sslrootcert: ~ # needed to unpin the AWS Redshift certificate
# check all available options here: https://pypi.org/project/redshift-connector/
ssl_insecure: "false" # Specifies if IDP hosts server certificate will be verified

sink:
# sink configs
8 changes: 2 additions & 6 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@
redshift_common = {
# Clickhouse 0.8.3 adds support for SQLAlchemy 1.4.x
"sqlalchemy-redshift>=0.8.3",
"psycopg2-binary",
"GeoAlchemy2",
"redshift-connector",
*sqllineage_lib,
*path_spec_common,
}
Expand Down Expand Up @@ -365,11 +365,7 @@
| {"psycopg2-binary", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib,
"redshift": sql_common
| redshift_common
| usage_common
| {"redshift-connector"}
| sqlglot_lib,
"redshift": sql_common | redshift_common | usage_common | sqlglot_lib,
"s3": {*s3_base, *data_lake_profiling},
"gcs": {*s3_base, *data_lake_profiling},
"sagemaker": aws_common,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class RedshiftConfig(
# large Redshift warehouses. As an example, see this query for the columns:
# https://github.com/sqlalchemy-redshift/sqlalchemy-redshift/blob/60b4db04c1d26071c291aeea52f1dcb5dd8b0eb0/sqlalchemy_redshift/dialect.py#L745.
scheme: str = Field(
default="redshift+psycopg2",
default="redshift+redshift_connector",
description="",
hidden_from_schema=True,
)
Expand Down Expand Up @@ -170,3 +170,24 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict:
"The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`."
)
return values

@root_validator(skip_on_failure=True)
def connection_config_compatibility_set(cls, values: Dict) -> Dict:
if (
("options" in values and "connect_args" in values["options"])
and "extra_client_options" in values
and len(values["extra_client_options"]) > 0
):
raise ValueError(
"Cannot set both `connect_args` and `extra_client_options` in the config. Please use `extra_client_options` only."
)

if "options" in values and "connect_args" in values["options"]:
values["extra_client_options"] = values["options"]["connect_args"]

if values["extra_client_options"]:
if values["options"]:
values["options"]["connect_args"] = values["extra_client_options"]
else:
values["options"] = {"connect_args": values["extra_client_options"]}
return values
18 changes: 13 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/redshift/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,18 @@ class RedshiftQuery:

additional_table_metadata: str = """
select
database,
schema,
ti.database,
ti.schema,
"table",
size,
tbl_rows,
estimated_visible_rows,
skew_rows,
last_accessed
last_accessed,
case
when smi.name is not null then 1
else 0
end as is_materialized
from
pg_catalog.svv_table_info as ti
left join (
Expand All @@ -198,8 +202,12 @@ class RedshiftQuery:
group by
tbl) as la on
(la.tbl = ti.table_id)
;
"""
left join stv_mv_info smi on
smi.db_name = ti.database
and smi.schema = ti.schema
and smi.name = ti.table
;
"""

@staticmethod
def stl_scan_based_lineage_query(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import humanfriendly

# These imports verify that the dependencies are available.
import psycopg2 # noqa: F401
import pydantic
import redshift_connector

Expand Down Expand Up @@ -352,15 +351,14 @@ def create(cls, config_dict, ctx):
def get_redshift_connection(
config: RedshiftConfig,
) -> redshift_connector.Connection:
client_options = config.extra_client_options
host, port = config.host_port.split(":")
conn = redshift_connector.connect(
host=host,
port=int(port),
user=config.username,
database=config.database,
password=config.password.get_secret_value() if config.password else None,
**client_options,
**config.extra_client_options,
)

conn.autocommit = True
Expand Down Expand Up @@ -641,7 +639,7 @@ def gen_view_dataset_workunits(
dataset_urn = self.gen_dataset_urn(datahub_dataset_name)
if view.ddl:
view_properties_aspect = ViewProperties(
materialized=view.type == "VIEW_MATERIALIZED",
materialized=view.materialized,
viewLanguage="SQL",
viewLogic=view.ddl,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class RedshiftTable(BaseTable):
@dataclass
class RedshiftView(BaseTable):
type: Optional[str] = None
materialized: bool = False
columns: List[RedshiftColumn] = field(default_factory=list)
last_altered: Optional[datetime] = None
size_in_bytes: Optional[int] = None
Expand All @@ -66,6 +67,7 @@ class RedshiftExtraTableMeta:
estimated_visible_rows: Optional[int] = None
skew_rows: Optional[float] = None
last_accessed: Optional[datetime] = None
is_materialized: bool = False


@dataclass
Expand Down Expand Up @@ -148,6 +150,7 @@ def enrich_tables(
],
skew_rows=meta[field_names.index("skew_rows")],
last_accessed=meta[field_names.index("last_accessed")],
is_materialized=meta[field_names.index("is_materialized")],
)
if table_meta.schema not in table_enrich:
table_enrich.setdefault(table_meta.schema, {})
Expand All @@ -173,42 +176,23 @@ def get_tables_and_views(
logger.info(f"Fetched {len(db_tables)} tables/views from Redshift")
for table in db_tables:
schema = table[field_names.index("schema")]
table_name = table[field_names.index("relname")]

if table[field_names.index("tabletype")] not in [
"MATERIALIZED VIEW",
"VIEW",
]:
if schema not in tables:
tables.setdefault(schema, [])
table_name = table[field_names.index("relname")]

creation_time: Optional[datetime] = None
if table[field_names.index("creation_time")]:
creation_time = table[field_names.index("creation_time")].replace(
tzinfo=timezone.utc
)

last_altered: Optional[datetime] = None
size_in_bytes: Optional[int] = None
rows_count: Optional[int] = None
if schema in enriched_table and table_name in enriched_table[schema]:
if enriched_table[schema][table_name].last_accessed:
# Mypy seems to be not clever enough to understand the above check
last_accessed = enriched_table[schema][table_name].last_accessed
assert last_accessed
last_altered = last_accessed.replace(tzinfo=timezone.utc)
elif creation_time:
last_altered = creation_time

if enriched_table[schema][table_name].size:
# Mypy seems to be not clever enough to understand the above check
size = enriched_table[schema][table_name].size
if size:
size_in_bytes = size * 1024 * 1024

if enriched_table[schema][table_name].estimated_visible_rows:
rows = enriched_table[schema][table_name].estimated_visible_rows
assert rows
rows_count = int(rows)
(
creation_time,
last_altered,
rows_count,
size_in_bytes,
) = RedshiftDataDictionary.get_table_stats(
enriched_table, field_names, schema, table
)

tables[schema].append(
RedshiftTable(
Expand All @@ -231,16 +215,37 @@ def get_tables_and_views(
else:
if schema not in views:
views[schema] = []
(
creation_time,
last_altered,
rows_count,
size_in_bytes,
) = RedshiftDataDictionary.get_table_stats(
enriched_table=enriched_table,
field_names=field_names,
schema=schema,
table=table,
)

materialized = False
if schema in enriched_table and table_name in enriched_table[schema]:
if enriched_table[schema][table_name].is_materialized:
materialized = True

views[schema].append(
RedshiftView(
type=table[field_names.index("tabletype")],
name=table[field_names.index("relname")],
ddl=table[field_names.index("view_definition")],
created=table[field_names.index("creation_time")],
created=creation_time,
comment=table[field_names.index("table_description")],
last_altered=last_altered,
size_in_bytes=size_in_bytes,
rows_count=rows_count,
materialized=materialized,
)
)

for schema_key, schema_tables in tables.items():
logger.info(
f"In schema: {schema_key} discovered {len(schema_tables)} tables"
Expand All @@ -250,6 +255,39 @@ def get_tables_and_views(

return tables, views

@staticmethod
def get_table_stats(enriched_table, field_names, schema, table):
table_name = table[field_names.index("relname")]

creation_time: Optional[datetime] = None
if table[field_names.index("creation_time")]:
creation_time = table[field_names.index("creation_time")].replace(
tzinfo=timezone.utc
)
last_altered: Optional[datetime] = None
size_in_bytes: Optional[int] = None
rows_count: Optional[int] = None
if schema in enriched_table and table_name in enriched_table[schema]:
if enriched_table[schema][table_name].last_accessed:
# Mypy seems to be not clever enough to understand the above check
last_accessed = enriched_table[schema][table_name].last_accessed
assert last_accessed
last_altered = last_accessed.replace(tzinfo=timezone.utc)
elif creation_time:
last_altered = creation_time

if enriched_table[schema][table_name].size:
# Mypy seems to be not clever enough to understand the above check
size = enriched_table[schema][table_name].size
if size:
size_in_bytes = size * 1024 * 1024

if enriched_table[schema][table_name].estimated_visible_rows:
rows = enriched_table[schema][table_name].estimated_visible_rows
assert rows
rows_count = int(rows)
return creation_time, last_altered, rows_count, size_in_bytes

@staticmethod
def get_schema_fields_for_column(
column: RedshiftColumn,
Expand Down

0 comments on commit dd3d149

Please sign in to comment.