Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/snowflake): operation aspect from Information Schema last… #8886

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ class SnowflakeV2Config(
description="Populates view->view and table->view column lineage.",
)

emit_last_updated_operation_from_ischema: bool = Field(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I don't get information schema from "ischema"

default=True,
description="Whether to emit operation aspect with table's last updated timestamp from information schema. "
"Note that exact query or operation type or actor can not be known from information schema hence they will be missing or "
"set to UNKNOWN. This is still useful to indicate freshness of table, given that snowflake account_usage views that are "
"used to ingest detailed operation history may have latency of upto 3 hours.",
)

_check_role_grants_removed = pydantic_removed_field("check_role_grants")
_provision_role_removed = pydantic_removed_field("provision_role")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ def operational_data_for_time_window(
) -> str:
return f"""
SELECT
-- access_history.query_id, -- only for debugging purposes
access_history.query_start_time AS "QUERY_START_TIME",
-- query_history.query_id, -- only for debugging purposes
query_history.start_time AS "QUERY_START_TIME",
query_history.query_text AS "QUERY_TEXT",
query_history.query_type AS "QUERY_TYPE",
query_history.rows_inserted AS "ROWS_INSERTED",
Expand All @@ -314,7 +314,7 @@ def operational_data_for_time_window(
access_history.objects_modified AS "OBJECTS_MODIFIED",
-- query_history.execution_status, -- not really necessary, but should equal "SUCCESS"
-- query_history.warehouse_name,
access_history.user_name AS "USER_NAME",
query_history.user_name AS "USER_NAME",
users.first_name AS "FIRST_NAME",
users.last_name AS "LAST_NAME",
users.display_name AS "DISPLAY_NAME",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import (
get_sys_time,
make_data_platform_urn,
make_dataset_urn,
make_dataset_urn_with_platform_instance,
Expand Down Expand Up @@ -106,6 +107,8 @@
)
from datahub.metadata.com.linkedin.pegasus2avro.common import (
GlobalTags,
Operation,
OperationType,
Status,
SubTypes,
TagAssociation,
Expand Down Expand Up @@ -136,6 +139,7 @@
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.sqlglot_lineage import SchemaResolver
from datahub.utilities.time import datetime_to_ts_millis

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -926,8 +930,12 @@ def fetch_sample_data_for_classification(
)

def fetch_foreign_keys_for_table(
self, table, schema_name, db_name, table_identifier
):
self,
table: SnowflakeTable,
schema_name: str,
db_name: str,
table_identifier: str,
) -> None:
try:
table.foreign_keys = self.get_fk_constraints_for_table(
table.name, schema_name, db_name
Expand All @@ -939,7 +947,13 @@ def fetch_foreign_keys_for_table(
)
self.report_warning("Failed to get foreign key for table", table_identifier)

def fetch_pk_for_table(self, table, schema_name, db_name, table_identifier):
def fetch_pk_for_table(
self,
table: SnowflakeTable,
schema_name: str,
db_name: str,
table_identifier: str,
) -> None:
try:
table.pk = self.get_pk_constraints_for_table(
table.name, schema_name, db_name
Expand All @@ -951,7 +965,13 @@ def fetch_pk_for_table(self, table, schema_name, db_name, table_identifier):
)
self.report_warning("Failed to get primary key for table", table_identifier)

def fetch_columns_for_table(self, table, schema_name, db_name, table_identifier):
def fetch_columns_for_table(
self,
table: SnowflakeTable,
schema_name: str,
db_name: str,
table_identifier: str,
) -> None:
try:
table.columns = self.get_columns_for_table(table.name, schema_name, db_name)
table.column_count = len(table.columns)
Expand Down Expand Up @@ -1114,6 +1134,16 @@ def gen_dataset_workunits(
entityUrn=dataset_urn, aspect=view_properties_aspect
).as_workunit()

if self.config.emit_last_updated_operation_from_ischema and table.last_altered:
operation_aspect = Operation(
timestampMillis=get_sys_time(),
operationType=OperationType.UNKNOWN,
lastUpdatedTimestamp=datetime_to_ts_millis(table.last_altered),
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=operation_aspect
).as_workunit()

def get_dataset_properties(
self,
table: Union[SnowflakeTable, SnowflakeView],
Expand All @@ -1122,12 +1152,12 @@ def get_dataset_properties(
) -> DatasetProperties:
return DatasetProperties(
name=table.name,
created=TimeStamp(time=int(table.created.timestamp() * 1000))
created=TimeStamp(time=datetime_to_ts_millis(table.created))
if table.created is not None
else None,
lastModified=TimeStamp(time=int(table.last_altered.timestamp() * 1000))
lastModified=TimeStamp(time=datetime_to_ts_millis(table.last_altered))
if table.last_altered is not None
else TimeStamp(time=int(table.created.timestamp() * 1000))
else TimeStamp(time=datetime_to_ts_millis(table.created))
if table.created is not None
else None,
description=table.comment,
Expand Down Expand Up @@ -1311,12 +1341,12 @@ def gen_database_containers(
if self.config.include_external_url
else None,
description=database.comment,
created=int(database.created.timestamp() * 1000)
created=datetime_to_ts_millis(database.created)
if database.created is not None
else None,
last_modified=int(database.last_altered.timestamp() * 1000)
last_modified=datetime_to_ts_millis(database.last_altered)
if database.last_altered is not None
else int(database.created.timestamp() * 1000)
else datetime_to_ts_millis(database.created)
if database.created is not None
else None,
tags=[self.snowflake_identifier(tag.identifier()) for tag in database.tags]
Expand Down Expand Up @@ -1356,12 +1386,12 @@ def gen_schema_containers(
external_url=self.get_external_url_for_schema(schema.name, db_name)
if self.config.include_external_url
else None,
created=int(schema.created.timestamp() * 1000)
created=datetime_to_ts_millis(schema.created)
if schema.created is not None
else None,
last_modified=int(schema.last_altered.timestamp() * 1000)
last_modified=datetime_to_ts_millis(schema.last_altered)
if schema.last_altered is not None
else int(schema.created.timestamp() * 1000)
else datetime_to_ts_millis(schema.created)
if schema.created is not None
else None,
tags=[self.snowflake_identifier(tag.identifier()) for tag in schema.tags]
Expand Down
Loading
Loading