Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): extend feast plugin to ingest tags and owners #11784

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions metadata-ingestion/scripts/datahub_preflight.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash -e
#!/bin/bash -e

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this space change?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

#From https://stackoverflow.com/questions/4023830/how-to-compare-two-strings-in-dot-separated-version-format-in-bash
verlte() {
Expand Down Expand Up @@ -45,7 +45,7 @@ arm64_darwin_preflight() {
pip3 install --no-use-pep517 scipy
fi

brew_install "openssl@1.1"
brew_install "openssl@3.0.14"
brew install "postgresql@14"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question - Why this change?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed to upgrade the dependency version because the 1.1 version was deprecated.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted to the previous version


# postgresql installs libs in a strange way
Expand Down
64 changes: 58 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/feast.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata._schema_classes import (
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import MLFeatureDataType
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
MLFeatureSnapshot,
Expand All @@ -42,10 +47,12 @@
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
BrowsePathsClass,
GlobalTagsClass,
MLFeaturePropertiesClass,
MLFeatureTablePropertiesClass,
MLPrimaryKeyPropertiesClass,
StatusClass,
TagAssociationClass,
)

# FIXME: ValueType module cannot be used as a type
Expand Down Expand Up @@ -216,9 +223,26 @@ def _get_entity_workunit(

feature_view_name = f"{self.feature_store.project}.{feature_view.name}"

aspects = [StatusClass(removed=False)]

if entity.tags.get("name"):
tag: str = entity.tags.get("name")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor -

Any way to extract the owner and tags logic into a reusable method

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already created a reusable method for tags and owner logic. Only thing missing is regarding the _owner_mapping (since we can only ingest the owner from Feast and we need to define the owner_type owner_ship_type_class, the solution i'm seeing is having a mapping to these values):

_owner_mapping: Dict[str, Dict[str, Any]] = { "Datahub": { "owner_type": builder.OwnerType.USER, "owner_ship_type_class": OwnershipTypeClass.DATAOWNER, } }

the issue here is having to specify in Datahub repo the specific use cases that each team might have (for example, in our case we will have an owner "MLOps") which might not be applicable to other teams.
Do you see any other possible solution?

tag_association = TagAssociationClass(tag=builder.make_tag_urn(tag))
global_tags_aspect = GlobalTagsClass(tags=[tag_association])
aspects.append(global_tags_aspect)

if entity.owner:
owner = entity.owner
owner_association = OwnerClass(
owner=builder.make_owner_urn(owner, owner_type=builder.OwnerType.USER),
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
owners_aspect = OwnershipClass(owners=[owner_association])
aspects.append(owners_aspect)

entity_snapshot = MLPrimaryKeySnapshot(
urn=builder.make_ml_primary_key_urn(feature_view_name, entity.name),
aspects=[StatusClass(removed=False)],
aspects=aspects,
)

entity_snapshot.aspects.append(
Expand All @@ -243,10 +267,20 @@ def _get_feature_workunit(
Generate an MLFeature work unit for a Feast feature.
"""
feature_view_name = f"{self.feature_store.project}.{feature_view.name}"
global_tags_aspect = None

if field.tags.get("name"):
tag_name = field.tags.get("name")
tag_association = TagAssociationClass(tag=builder.make_tag_urn(tag_name))
global_tags_aspect = GlobalTagsClass(tags=[tag_association])

aspects = [StatusClass(removed=False)]
if global_tags_aspect is not None:
aspects.append(global_tags_aspect)

feature_snapshot = MLFeatureSnapshot(
urn=builder.make_ml_feature_urn(feature_view_name, field.name),
aspects=[StatusClass(removed=False)],
aspects=aspects,
)

feature_sources = []
Expand Down Expand Up @@ -296,12 +330,29 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU

feature_view_name = f"{self.feature_store.project}.{feature_view.name}"

aspects = [
BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]),
StatusClass(removed=False),
]

if feature_view.tags.get("name"):
tag = feature_view.tags.get("name")
tag_association = TagAssociationClass(tag=builder.make_tag_urn(tag))
global_tags_aspect = GlobalTagsClass(tags=[tag_association])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just note: If you've attached tags in DataHub, this will replace them by default :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is to have Feast as our source of truth, so the fields must be defined in Feast repo.
Do you agree with this?

aspects.append(global_tags_aspect)

if feature_view.owner:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just note: If you've attached owners in DataHub UI, this will replace them by default :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above

owner = feature_view.owner
owner_association = OwnerClass(
owner=builder.make_owner_urn(owner, owner_type=builder.OwnerType.USER),
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
owners_aspect = OwnershipClass(owners=[owner_association])
aspects.append(owners_aspect)

feature_view_snapshot = MLFeatureTableSnapshot(
urn=builder.make_ml_feature_table_urn("feast", feature_view_name),
aspects=[
BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]),
StatusClass(removed=False),
],
aspects=aspects,
)

feature_view_snapshot.aspects.append(
Expand Down Expand Up @@ -366,6 +417,7 @@ def create(cls, config_dict, ctx):
return cls(config, ctx)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

for feature_view in self.feature_store.list_feature_views():
for entity_name in feature_view.entities:
entity = self.feature_store.get_entity(entity_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLPrimaryKeyProperties": {
"customProperties": {},
"description": "Driver ID",
"dataType": "ORDINAL",
"sources": [
Expand All @@ -23,7 +24,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test"
"runId": "feast-repository-test",
"lastRunId": "no-run-id-provided"
}
},
{
Expand All @@ -38,6 +40,7 @@
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"customProperties": {},
"description": "Conv rate",
"dataType": "CONTINUOUS",
"sources": [
Expand All @@ -50,7 +53,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test"
"runId": "feast-repository-test",
"lastRunId": "no-run-id-provided"
}
},
{
Expand All @@ -65,6 +69,7 @@
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"customProperties": {},
"description": "Acc rate",
"dataType": "CONTINUOUS",
"sources": [
Expand All @@ -77,7 +82,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test"
"runId": "feast-repository-test",
"lastRunId": "no-run-id-provided"
}
},
{
Expand All @@ -92,6 +98,7 @@
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"customProperties": {},
"description": "Avg daily trips",
"dataType": "ORDINAL",
"sources": [
Expand All @@ -104,7 +111,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test"
"runId": "feast-repository-test",
"lastRunId": "no-run-id-provided"
}
},
{
Expand All @@ -119,6 +127,7 @@
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"customProperties": {},
"description": "String feature",
"dataType": "TEXT",
"sources": [
Expand All @@ -131,7 +140,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test"
"runId": "feast-repository-test",
"lastRunId": "no-run-id-provided"
}
},
{
Expand Down Expand Up @@ -170,7 +180,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test"
"runId": "feast-repository-test",
"lastRunId": "no-run-id-provided"
}
},
{
Expand All @@ -189,7 +200,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test"
"runId": "feast-repository-test",
"lastRunId": "no-run-id-provided"
}
},
{
Expand All @@ -204,6 +216,7 @@
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"customProperties": {},
"dataType": "CONTINUOUS",
"sources": [
"urn:li:dataset:(urn:li:dataPlatform:request,vals_to_add,PROD)",
Expand All @@ -216,7 +229,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test"
"runId": "feast-repository-test",
"lastRunId": "no-run-id-provided"
}
},
{
Expand All @@ -231,6 +245,7 @@
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"customProperties": {},
"dataType": "CONTINUOUS",
"sources": [
"urn:li:dataset:(urn:li:dataPlatform:request,vals_to_add,PROD)",
Expand All @@ -243,7 +258,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test"
"runId": "feast-repository-test",
"lastRunId": "no-run-id-provided"
}
},
{
Expand Down Expand Up @@ -278,7 +294,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test"
"runId": "feast-repository-test",
"lastRunId": "no-run-id-provided"
}
},
{
Expand All @@ -297,7 +314,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test"
"runId": "feast-repository-test",
"lastRunId": "no-run-id-provided"
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
join_keys=["driver_id"],
value_type=ValueType.INT64,
description="Driver ID",
owner="Datahub",
tags={"name": "deprecated"},
)

driver_hourly_stats_view = FeatureView(
Expand All @@ -29,7 +31,7 @@
Field(
name="conv_rate",
dtype=feast.types.Float64,
tags=dict(description="Conv rate"),
tags={"name": "needs_documentation", "description": "Conv rate"},
),
Field(
name="acc_rate",
Expand All @@ -49,7 +51,8 @@
],
online=True,
source=driver_hourly_stats_source,
tags={},
tags={"name": "deprecated"},
owner="Datahub",
)

input_request = RequestSource(
Expand Down
Loading