diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index e097fd1f221ea..ff4e9a48fe7b2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Dict, Iterable, List, Optional, Tuple, Union +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union import feast.types from feast import ( @@ -42,10 +42,15 @@ from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.schema_classes import ( BrowsePathsClass, + GlobalTagsClass, MLFeaturePropertiesClass, MLFeatureTablePropertiesClass, MLPrimaryKeyPropertiesClass, + OwnerClass, + OwnershipClass, + OwnershipTypeClass, StatusClass, + TagAssociationClass, ) # FIXME: ValueType module cannot be used as a type @@ -81,6 +86,14 @@ feast.types.Invalid: MLFeatureDataType.UNKNOWN, } +# FIXME: Update to have more owners +_owner_mapping: Dict[str, Dict[str, Any]] = { + "Datahub": { + "owner_type": builder.OwnerType.GROUP, + "owner_ship_type_class": OwnershipTypeClass.DATAOWNER, + } +} + class FeastRepositorySourceConfig(ConfigModel): path: str = Field(description="Path to Feast repository") @@ -213,14 +226,13 @@ def _get_entity_workunit( """ Generate an MLPrimaryKey work unit for a Feast entity. """ - feature_view_name = f"{self.feature_store.project}.{feature_view.name}" + aspects = [StatusClass(removed=False)] + self._get_tags_and_owners(entity) entity_snapshot = MLPrimaryKeySnapshot( urn=builder.make_ml_primary_key_urn(feature_view_name, entity.name), - aspects=[StatusClass(removed=False)], + aspects=aspects, ) - entity_snapshot.aspects.append( MLPrimaryKeyPropertiesClass( description=entity.description, @@ -230,12 +242,10 @@ def _get_entity_workunit( ) mce = MetadataChangeEvent(proposedSnapshot=entity_snapshot) - return MetadataWorkUnit(id=entity.name, mce=mce) def _get_feature_workunit( self, - # FIXME: FeatureView and OnDemandFeatureView cannot be used as a type feature_view: Union[FeatureView, OnDemandFeatureView], field: FeastField, ) -> MetadataWorkUnit: @@ -243,38 +253,34 @@ def _get_feature_workunit( Generate an MLFeature work unit for a Feast feature. """ feature_view_name = f"{self.feature_store.project}.{feature_view.name}" + aspects = [StatusClass(removed=False)] + self._get_tags_and_owners(field) feature_snapshot = MLFeatureSnapshot( urn=builder.make_ml_feature_urn(feature_view_name, field.name), - aspects=[StatusClass(removed=False)], + aspects=aspects, ) feature_sources = [] if isinstance(feature_view, FeatureView): feature_sources = self._get_data_sources(feature_view) elif isinstance(feature_view, OnDemandFeatureView): - if feature_view.source_request_sources is not None: + if feature_view.source_request_sources: for request_source in feature_view.source_request_sources.values(): source_platform, source_name = self._get_data_source_details( request_source ) - feature_sources.append( builder.make_dataset_urn( - source_platform, - source_name, - self.source_config.environment, + source_platform, source_name, self.source_config.environment ) ) - - if feature_view.source_feature_view_projections is not None: + if feature_view.source_feature_view_projections: for ( feature_view_projection ) in feature_view.source_feature_view_projections.values(): feature_view_source = self.feature_store.get_feature_view( feature_view_projection.name ) - feature_sources.extend(self._get_data_sources(feature_view_source)) feature_snapshot.aspects.append( @@ -286,31 +292,27 @@ def _get_feature_workunit( ) mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot) - return MetadataWorkUnit(id=field.name, mce=mce) def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkUnit: """ Generate an MLFeatureTable work unit for a Feast feature view. """ - feature_view_name = f"{self.feature_store.project}.{feature_view.name}" + aspects = [ + BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]), + StatusClass(removed=False), + ] + self._get_tags_and_owners(feature_view) feature_view_snapshot = MLFeatureTableSnapshot( urn=builder.make_ml_feature_table_urn("feast", feature_view_name), - aspects=[ - BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]), - StatusClass(removed=False), - ], + aspects=aspects, ) feature_view_snapshot.aspects.append( MLFeatureTablePropertiesClass( mlFeatures=[ - builder.make_ml_feature_urn( - feature_view_name, - feature.name, - ) + builder.make_ml_feature_urn(feature_view_name, feature.name) for feature in feature_view.features ], mlPrimaryKeys=[ @@ -321,7 +323,6 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU ) mce = MetadataChangeEvent(proposedSnapshot=feature_view_snapshot) - return MetadataWorkUnit(id=feature_view_name, mce=mce) def _get_on_demand_feature_view_workunit( @@ -360,12 +361,46 @@ def _get_on_demand_feature_view_workunit( return MetadataWorkUnit(id=on_demand_feature_view_name, mce=mce) + def _get_tags_and_owners(self, obj: Union[Entity, FeatureView, FeastField]) -> list: + """ + Extracts tags and owners from the given object and returns a list of aspects. + """ + aspects: List[Union[GlobalTagsClass, OwnershipClass]] = [] + + # Extract tags + tag_name = obj.tags.get("name") if obj.tags else None + if tag_name: + tag_association = TagAssociationClass(tag=builder.make_tag_urn(tag_name)) + global_tags_aspect = GlobalTagsClass(tags=[tag_association]) + aspects.append(global_tags_aspect) + + # Extract owner + owner = getattr(obj, "owner", None) + if owner: + owner_association = self._create_owner_association(owner) + owners_aspect = OwnershipClass(owners=[owner_association]) + aspects.append(owners_aspect) + + return aspects + + def _create_owner_association(self, owner: str) -> OwnerClass: + + owner_type: builder.OwnerType = _owner_mapping[owner]["owner_type"] + owner_ship_type_class: OwnershipTypeClass = _owner_mapping[owner][ + "owner_ship_type_class" + ] + return OwnerClass( + owner=builder.make_owner_urn(owner, owner_type=owner_type), + type=owner_ship_type_class, + ) + @classmethod def create(cls, config_dict, ctx): config = FeastRepositorySourceConfig.parse_obj(config_dict) return cls(config, ctx) def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + for feature_view in self.feature_store.list_feature_views(): for entity_name in feature_view.entities: entity = self.feature_store.get_entity(entity_name) diff --git a/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json b/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json index 1b91925289845..149d94e46e93c 100644 --- a/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json +++ b/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json @@ -11,6 +11,7 @@ }, { "com.linkedin.pegasus2avro.ml.metadata.MLPrimaryKeyProperties": { + "customProperties": {}, "description": "Driver ID", "dataType": "ORDINAL", "sources": [ @@ -23,7 +24,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -38,6 +40,7 @@ }, { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, "description": "Conv rate", "dataType": "CONTINUOUS", "sources": [ @@ -50,7 +53,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -65,6 +69,7 @@ }, { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, "description": "Acc rate", "dataType": "CONTINUOUS", "sources": [ @@ -77,7 +82,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -92,6 +98,7 @@ }, { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, "description": "Avg daily trips", "dataType": "ORDINAL", "sources": [ @@ -104,7 +111,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -119,6 +127,7 @@ }, { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, "description": "String feature", "dataType": "TEXT", "sources": [ @@ -131,7 +140,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -170,7 +180,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -189,7 +200,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -204,6 +216,7 @@ }, { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, "dataType": "CONTINUOUS", "sources": [ "urn:li:dataset:(urn:li:dataPlatform:request,vals_to_add,PROD)", @@ -216,7 +229,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -231,6 +245,7 @@ }, { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, "dataType": "CONTINUOUS", "sources": [ "urn:li:dataset:(urn:li:dataPlatform:request,vals_to_add,PROD)", @@ -243,7 +258,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -278,7 +294,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -297,7 +314,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/feast/feature_store/features.py b/metadata-ingestion/tests/integration/feast/feature_store/features.py index a6e6cd3616e92..f6cb12414bd5d 100644 --- a/metadata-ingestion/tests/integration/feast/feature_store/features.py +++ b/metadata-ingestion/tests/integration/feast/feature_store/features.py @@ -19,6 +19,8 @@ join_keys=["driver_id"], value_type=ValueType.INT64, description="Driver ID", + owner="Datahub", + tags={"name": "deprecated"}, ) driver_hourly_stats_view = FeatureView( @@ -29,7 +31,7 @@ Field( name="conv_rate", dtype=feast.types.Float64, - tags=dict(description="Conv rate"), + tags={"name": "needs_documentation", "description": "Conv rate"}, ), Field( name="acc_rate", @@ -49,7 +51,8 @@ ], online=True, source=driver_hourly_stats_source, - tags={}, + tags={"name": "deprecated"}, + owner="Datahub", ) input_request = RequestSource(