Skip to content

Commit

Permalink
feat(ingestion/snowflake): use user email-id in urn generation for to…
Browse files Browse the repository at this point in the history
…p users stat (#8513)

Co-authored-by: MohdSiddiqueBagwan <[email protected]>
  • Loading branch information
siddiquebagwan and siddiquebagwan-gslab authored Aug 3, 2023
1 parent 037ae14 commit 6a36118
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 17 deletions.
2 changes: 1 addition & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ certain column-level metrics. Instead, set `profile_table_level_only` to `false`
individually enable / disable desired field metrics.
- #8451: The `bigquery-beta` and `snowflake-beta` source aliases have been dropped. Use `bigquery` and `snowflake` as the source type instead.
- #8472: Ingestion runs created with Pipeline.create will show up in the DataHub ingestion tab as CLI-based runs. To revert to the previous behavior of not showing these runs in DataHub, pass `no_default_report=True`.

- #8513: `snowflake` connector will use user's `email` attribute as is in urn. To revert to previous behavior disable `email_as_user_identifier` in recipe.
### Potential Downtime

- BrowsePathsV2 upgrade will now be handled by the `system-update` job in non-blocking mode. This process generates data needed for the new search
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ class SnowflakeV2Config(
"upstreams_deny_pattern", "temporary_tables_pattern"
)

email_as_user_identifier: bool = Field(
default=True,
description="Format user urns as an email, if the snowflake user's email is set. If `email_domain` is provided, generates email addresses for snowflake users with unset emails, based on their username.",
)

@validator("include_column_lineage")
def validate_include_column_lineage(cls, v, values):
if not values.get("include_table_lineage") and v:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ def build_usage_statistics_for_dataset(self, dataset_identifier, row):
)
if self.config.include_top_n_queries
else None,
userCounts=self._map_user_counts(json.loads(row["USER_COUNTS"])),
userCounts=self._map_user_counts(
json.loads(row["USER_COUNTS"]),
),
fieldCounts=self._map_field_counts(json.loads(row["FIELD_COUNTS"])),
)

Expand Down Expand Up @@ -247,7 +249,10 @@ def _map_top_sql_queries(self, top_sql_queries: Dict) -> List[str]:
]
)

def _map_user_counts(self, user_counts: Dict) -> List[DatasetUserUsageCounts]:
def _map_user_counts(
self,
user_counts: Dict,
) -> List[DatasetUserUsageCounts]:
filtered_user_counts = []
for user_count in user_counts:
user_email = user_count.get("email")
Expand All @@ -261,7 +266,11 @@ def _map_user_counts(self, user_counts: Dict) -> List[DatasetUserUsageCounts]:
filtered_user_counts.append(
DatasetUserUsageCounts(
user=make_user_urn(
self.get_user_identifier(user_count["user_name"], user_email)
self.get_user_identifier(
user_count["user_name"],
user_email,
self.config.email_as_user_identifier,
)
),
count=user_count["total"],
# NOTE: Generated emails may be incorrect, as email may be different than
Expand Down Expand Up @@ -347,6 +356,7 @@ def _check_usage_date_ranges(self) -> Any:
def _get_operation_aspect_work_unit(
self, event: SnowflakeJoinedAccessEvent, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:

if event.query_start_time and event.query_type:
start_time = event.query_start_time
query_type = event.query_type
Expand All @@ -357,7 +367,11 @@ def _get_operation_aspect_work_unit(
)
reported_time: int = int(time.time() * 1000)
last_updated_timestamp: int = int(start_time.timestamp() * 1000)
user_urn = make_user_urn(self.get_user_identifier(user_name, user_email))
user_urn = make_user_urn(
self.get_user_identifier(
user_name, user_email, self.config.email_as_user_identifier
)
)

# NOTE: In earlier `snowflake-usage` connector this was base_objects_accessed, which is incorrect
for obj in event.objects_modified:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,17 @@ def get_dataset_identifier_from_qualified_name(
# Users without email were skipped from both user entries as well as aggregates.
# However email is not mandatory field in snowflake user, user_name is always present.
def get_user_identifier(
self: SnowflakeCommonProtocol, user_name: str, user_email: Optional[str]
self: SnowflakeCommonProtocol,
user_name: str,
user_email: Optional[str],
email_as_user_identifier: bool,
) -> str:
if user_email:
return self.snowflake_identifier(user_email.split("@")[0])
return self.snowflake_identifier(
user_email
if email_as_user_identifier is True
else user_email.split("@")[0]
)
return self.snowflake_identifier(user_name)

# TODO: Revisit this after stateful ingestion can commit checkpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5779,7 +5779,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5801,7 +5801,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5823,7 +5823,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5845,7 +5845,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5867,7 +5867,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5889,7 +5889,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5911,7 +5911,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5933,7 +5933,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5955,7 +5955,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5977,7 +5977,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
use_legacy_lineage_method=False,
validate_upstreams_against_patterns=False,
include_operational_stats=True,
email_as_user_identifier=True,
start_time=datetime(2022, 6, 6, 7, 17, 0, 0).replace(
tzinfo=timezone.utc
),
Expand Down

0 comments on commit 6a36118

Please sign in to comment.