Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/snowflake): use user email-id in urn generation for top users stat #8513

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ certain column-level metrics. Instead, set `profile_table_level_only` to `false`
individually enable / disable desired field metrics.
- #8451: The `bigquery-beta` and `snowflake-beta` source aliases have been dropped. Use `bigquery` and `snowflake` as the source type instead.
- #8472: Ingestion runs created with Pipeline.create will show up in the DataHub ingestion tab as CLI-based runs. To revert to the previous behavior of not showing these runs in DataHub, pass `no_default_report=True`.

- #8513: `snowflake` connector will use user's `email` attribute as is in urn. To revert to previous behavior disable `email_as_user_identifier` in recipe.
### Potential Downtime

### Deprecations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ class SnowflakeV2Config(
"upstreams_deny_pattern", "temporary_tables_pattern"
)

email_as_user_identifier: bool = Field(
default=True,
description="Format user urns as an email, if the snowflake user's email is set. If `email_domain` is provided, generates email addresses for snowflake users with unset emails, based on their username.",
)

@validator("include_column_lineage")
def validate_include_column_lineage(cls, v, values):
if not values.get("include_table_lineage") and v:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ def build_usage_statistics_for_dataset(self, dataset_identifier, row):
)
if self.config.include_top_n_queries
else None,
userCounts=self._map_user_counts(json.loads(row["USER_COUNTS"])),
userCounts=self._map_user_counts(
json.loads(row["USER_COUNTS"]),
),
fieldCounts=self._map_field_counts(json.loads(row["FIELD_COUNTS"])),
)

Expand Down Expand Up @@ -247,7 +249,10 @@ def _map_top_sql_queries(self, top_sql_queries: Dict) -> List[str]:
]
)

def _map_user_counts(self, user_counts: Dict) -> List[DatasetUserUsageCounts]:
def _map_user_counts(
self,
user_counts: Dict,
) -> List[DatasetUserUsageCounts]:
filtered_user_counts = []
for user_count in user_counts:
user_email = user_count.get("email")
Expand All @@ -261,7 +266,11 @@ def _map_user_counts(self, user_counts: Dict) -> List[DatasetUserUsageCounts]:
filtered_user_counts.append(
DatasetUserUsageCounts(
user=make_user_urn(
self.get_user_identifier(user_count["user_name"], user_email)
self.get_user_identifier(
user_count["user_name"],
user_email,
self.config.email_as_user_identifier,
)
),
count=user_count["total"],
# NOTE: Generated emails may be incorrect, as email may be different than
Expand Down Expand Up @@ -347,6 +356,7 @@ def _check_usage_date_ranges(self) -> Any:
def _get_operation_aspect_work_unit(
self, event: SnowflakeJoinedAccessEvent, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:

if event.query_start_time and event.query_type:
start_time = event.query_start_time
query_type = event.query_type
Expand All @@ -357,7 +367,11 @@ def _get_operation_aspect_work_unit(
)
reported_time: int = int(time.time() * 1000)
last_updated_timestamp: int = int(start_time.timestamp() * 1000)
user_urn = make_user_urn(self.get_user_identifier(user_name, user_email))
user_urn = make_user_urn(
self.get_user_identifier(
user_name, user_email, self.config.email_as_user_identifier
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add logic to generate an email from self.config.email_domain if it's set, like we do in _map_user_counts? Perhaps it'd be best to put that logic in get_user_identifier instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's keep it as is. The current method uses the user_email to determine whether to skip the current metadata (i.e continue in for loop)

)
)

# NOTE: In earlier `snowflake-usage` connector this was base_objects_accessed, which is incorrect
for obj in event.objects_modified:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,17 @@ def get_dataset_identifier_from_qualified_name(
# Users without email were skipped from both user entries as well as aggregates.
# However email is not mandatory field in snowflake user, user_name is always present.
def get_user_identifier(
self: SnowflakeCommonProtocol, user_name: str, user_email: Optional[str]
self: SnowflakeCommonProtocol,
user_name: str,
user_email: Optional[str],
email_as_user_identifier: bool,
) -> str:
if user_email:
return self.snowflake_identifier(user_email.split("@")[0])
return self.snowflake_identifier(
user_email
if email_as_user_identifier is True
else user_email.split("@")[0]
)
return self.snowflake_identifier(user_name)

# TODO: Revisit this after stateful ingestion can commit checkpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5779,7 +5779,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5801,7 +5801,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5823,7 +5823,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5845,7 +5845,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5867,7 +5867,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5889,7 +5889,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5911,7 +5911,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5933,7 +5933,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5955,7 +5955,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand All @@ -5977,7 +5977,7 @@
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:abc",
"actor": "urn:li:corpuser:abc@xyz.com",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1654144861367
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
use_legacy_lineage_method=False,
validate_upstreams_against_patterns=False,
include_operational_stats=True,
email_as_user_identifier=True,
start_time=datetime(2022, 6, 6, 7, 17, 0, 0).replace(
tzinfo=timezone.utc
),
Expand Down
Loading