From 7d4b645225953f3218c5bf891637582f93f3fbcb Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Tue, 30 Jul 2024 16:04:20 -0700 Subject: [PATCH] =?UTF-8?q?fix(models):=20chart=20schema=20fields=20mappin?= =?UTF-8?q?g,=20add=20dataHubAction=20entity,=20t=E2=80=A6=20(#11040)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../chart/mappers/InputFieldsMapper.java | 18 +- .../metadata/key/DataHubActionKey.pdl | 14 ++ .../src/main/resources/entity-registry.yml | 26 +++ smoke-test/tests/schema_fields/__init__.py | 0 .../schema_fields/queries/get_chart_field.gql | 20 ++ .../tests/schema_fields/test_schemafields.py | 177 ++++++++++++++++++ 6 files changed, 251 insertions(+), 4 deletions(-) create mode 100644 metadata-models/src/main/pegasus/com/linkedin/metadata/key/DataHubActionKey.pdl create mode 100644 smoke-test/tests/schema_fields/__init__.py create mode 100644 smoke-test/tests/schema_fields/queries/get_chart_field.gql create mode 100644 smoke-test/tests/schema_fields/test_schemafields.py diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/chart/mappers/InputFieldsMapper.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/chart/mappers/InputFieldsMapper.java index 49c2d17ce0958..a4e40750f0d65 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/chart/mappers/InputFieldsMapper.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/chart/mappers/InputFieldsMapper.java @@ -5,10 +5,14 @@ import com.linkedin.datahub.graphql.QueryContext; import com.linkedin.datahub.graphql.generated.InputField; import com.linkedin.datahub.graphql.types.dataset.mappers.SchemaFieldMapper; +import java.net.URISyntaxException; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class InputFieldsMapper { public static final InputFieldsMapper INSTANCE = new InputFieldsMapper(); @@ -31,13 +35,19 @@ public com.linkedin.datahub.graphql.generated.InputFields apply( .map( field -> { InputField fieldResult = new InputField(); + Urn parentUrn = entityUrn; - if (field.hasSchemaField()) { - fieldResult.setSchemaField( - SchemaFieldMapper.map(context, field.getSchemaField(), entityUrn)); - } if (field.hasSchemaFieldUrn()) { fieldResult.setSchemaFieldUrn(field.getSchemaFieldUrn().toString()); + try { + parentUrn = Urn.createFromString(field.getSchemaFieldUrn().getEntityKey().get(0)); + } catch (URISyntaxException e) { + log.error("Field urn resolution: failed to extract parentUrn successfully from {}. Falling back to {}", field.getSchemaFieldUrn(), entityUrn, e); + } + } + if (field.hasSchemaField()) { + fieldResult.setSchemaField( + SchemaFieldMapper.map(context, field.getSchemaField(), parentUrn)); } return fieldResult; }) diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/key/DataHubActionKey.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/key/DataHubActionKey.pdl new file mode 100644 index 0000000000000..8205ecbb80716 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/key/DataHubActionKey.pdl @@ -0,0 +1,14 @@ +namespace com.linkedin.metadata.key + +/** + * Key for a DataHub Action Pipeline + */ +@Aspect = { + "name": "dataHubActionKey" +} +record DataHubActionKey { + /** + * A unique id for the Action, either generated or provided + */ + id: string +} \ No newline at end of file diff --git a/metadata-models/src/main/resources/entity-registry.yml b/metadata-models/src/main/resources/entity-registry.yml index c9f9a851ccc08..f8520990a0984 100644 --- a/metadata-models/src/main/resources/entity-registry.yml +++ b/metadata-models/src/main/resources/entity-registry.yml @@ -75,6 +75,7 @@ entities: - forms - subTypes - incidentsSummary + - testResults - name: dataFlow category: core keyAspect: dataFlowKey @@ -96,12 +97,14 @@ entities: - incidentsSummary - forms - subTypes + - testResults - name: dataProcess keyAspect: dataProcessKey aspects: - dataProcessInfo - ownership - status + - testResults - name: dataProcessInstance doc: DataProcessInstance represents an instance of a datajob/jobflow run keyAspect: dataProcessInstanceKey @@ -112,6 +115,7 @@ entities: - dataProcessInstanceRelationships - dataProcessInstanceRunEvent - status + - testResults - name: chart category: core keyAspect: chartKey @@ -137,6 +141,7 @@ entities: - structuredProperties - incidentsSummary - forms + - testResults - name: dashboard keyAspect: dashboardKey aspects: @@ -160,6 +165,7 @@ entities: - structuredProperties - incidentsSummary - forms + - testResults - name: notebook doc: Notebook represents a combination of query, text, chart and etc. This is in BETA version keyAspect: notebookKey @@ -177,6 +183,7 @@ entities: - subTypes - dataPlatformInstance - browsePathsV2 + - testResults - name: corpuser doc: CorpUser represents an identity of a person (or an account) in the enterprise. keyAspect: corpUserKey @@ -194,6 +201,7 @@ entities: - roleMembership - structuredProperties - forms + - testResults - name: corpGroup doc: CorpGroup represents an identity of a group of users in the enterprise. keyAspect: corpGroupKey @@ -207,6 +215,7 @@ entities: - roleMembership - structuredProperties - forms + - testResults - name: domain doc: A data domain within an organization. category: core @@ -217,6 +226,7 @@ entities: - ownership - structuredProperties - forms + - testResults - name: container doc: A container of related data assets. category: core @@ -237,6 +247,7 @@ entities: - browsePathsV2 - structuredProperties - forms + - testResults - name: tag category: core keyAspect: tagKey @@ -245,6 +256,7 @@ entities: - ownership - deprecation - status + - testResults - name: glossaryTerm category: core keyAspect: glossaryTermKey @@ -260,6 +272,7 @@ entities: - browsePaths - structuredProperties - forms + - testResults - name: glossaryNode category: core keyAspect: glossaryNodeKey @@ -270,6 +283,7 @@ entities: - status - structuredProperties - forms + - testResults - name: dataHubIngestionSource category: internal keyAspect: dataHubIngestionSourceKey @@ -341,6 +355,7 @@ entities: - browsePathsV2 - structuredProperties - forms + - testResults - name: mlModelGroup category: core keyAspect: mlModelGroupKey @@ -358,6 +373,7 @@ entities: - browsePathsV2 - structuredProperties - forms + - testResults - name: mlModelDeployment category: core keyAspect: mlModelDeploymentKey @@ -368,6 +384,7 @@ entities: - deprecation - globalTags - dataPlatformInstance + - testResults - name: mlFeatureTable category: core keyAspect: mlFeatureTableKey @@ -386,6 +403,7 @@ entities: - browsePathsV2 - structuredProperties - forms + - testResults - name: mlFeature category: core keyAspect: mlFeatureKey @@ -404,6 +422,7 @@ entities: - browsePathsV2 - structuredProperties - forms + - testResults - name: mlPrimaryKey category: core keyAspect: mlPrimaryKeyKey @@ -420,6 +439,7 @@ entities: - dataPlatformInstance - structuredProperties - forms + - testResults - name: telemetry category: internal keyAspect: telemetryKey @@ -456,6 +476,7 @@ entities: - forms - businessAttributes - documentation + - testResults - name: globalSettings doc: Global settings for an the platform category: internal @@ -523,6 +544,7 @@ entities: - status - structuredProperties - forms + - testResults - name: ownershipType doc: Ownership Type represents a user-created ownership category for a person or group who is responsible for an asset. category: core @@ -550,6 +572,10 @@ entities: keyAspect: dataHubPersonaKey aspects: - dataHubPersonaInfo + - name: dataHubAction + category: internal + keyAspect: dataHubActionKey + aspects: [] - name: entityType doc: A type of entity in the DataHub Metadata Model. category: core diff --git a/smoke-test/tests/schema_fields/__init__.py b/smoke-test/tests/schema_fields/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/smoke-test/tests/schema_fields/queries/get_chart_field.gql b/smoke-test/tests/schema_fields/queries/get_chart_field.gql new file mode 100644 index 0000000000000..424e5ad686ab9 --- /dev/null +++ b/smoke-test/tests/schema_fields/queries/get_chart_field.gql @@ -0,0 +1,20 @@ +query($urn:String!) { + chart(urn: $urn) { + inputFields { + fields { + schemaFieldUrn + schemaField { + schemaFieldEntity { + urn + fieldPath + documentation { + documentations { + documentation + } + } + } + } + } + } + } +} \ No newline at end of file diff --git a/smoke-test/tests/schema_fields/test_schemafields.py b/smoke-test/tests/schema_fields/test_schemafields.py new file mode 100644 index 0000000000000..31f237308e2a8 --- /dev/null +++ b/smoke-test/tests/schema_fields/test_schemafields.py @@ -0,0 +1,177 @@ +import logging +import os +import tempfile +import time +from random import randint + +import datahub.metadata.schema_classes as models +import pytest +from datahub.emitter.mce_builder import ( + make_dataset_urn, + make_schema_field_urn, +) +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext, RecordEnvelope +from datahub.ingestion.api.sink import NoopWriteCallback +from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph +from datahub.ingestion.sink.file import FileSink, FileSinkConfig + +from tests.utils import ( + delete_urns_from_file, + get_gms_url, + get_sleep_info, + ingest_file_via_rest, + wait_for_writes_to_sync, +) + +logger = logging.getLogger(__name__) + + +start_index = randint(10, 10000) +dataset_urns = [ + make_dataset_urn("snowflake", f"table_foo_{i}") + for i in range(start_index, start_index + 10) +] + + +class FileEmitter: + def __init__(self, filename: str) -> None: + self.sink: FileSink = FileSink( + ctx=PipelineContext(run_id="create_test_data"), + config=FileSinkConfig(filename=filename), + ) + + def emit(self, event): + self.sink.write_record_async( + record_envelope=RecordEnvelope(record=event, metadata={}), + write_callback=NoopWriteCallback(), + ) + + def close(self): + self.sink.close() + + +@pytest.fixture(scope="module") +def chart_urn(): + return "urn:li:chart:(looker,chart_foo)" + + +@pytest.fixture(scope="module") +def upstream_schema_field_urn(): + return make_schema_field_urn(make_dataset_urn("snowflake", "table_bar"), "field1") + + +def create_test_data(filename: str, chart_urn: str, upstream_schema_field_urn: str): + documentation_mcp = MetadataChangeProposalWrapper( + entityUrn=upstream_schema_field_urn, + aspect=models.DocumentationClass( + documentations=[ + models.DocumentationAssociationClass( + documentation="test documentation", + attribution=models.MetadataAttributionClass( + time=int(time.time() * 1000), + actor="urn:li:corpuser:datahub", + source="urn:li:dataHubAction:documentation_propagation", + ), + ) + ] + ), + ) + + input_fields_mcp = MetadataChangeProposalWrapper( + entityUrn=chart_urn, + aspect=models.InputFieldsClass( + fields=[ + models.InputFieldClass( + schemaFieldUrn=upstream_schema_field_urn, + schemaField=models.SchemaFieldClass( + fieldPath="field1", + type=models.SchemaFieldDataTypeClass(models.StringTypeClass()), + nativeDataType="STRING", + ), + ) + ] + ), + ) + + file_emitter = FileEmitter(filename) + for mcps in [documentation_mcp, input_fields_mcp]: + file_emitter.emit(mcps) + + file_emitter.close() + + +sleep_sec, sleep_times = get_sleep_info() + + +@pytest.fixture(scope="module", autouse=False) +def ingest_cleanup_data(request, chart_urn, upstream_schema_field_urn): + new_file, filename = tempfile.mkstemp(suffix=".json") + try: + create_test_data(filename, chart_urn, upstream_schema_field_urn) + print("ingesting schema fields test data") + ingest_file_via_rest(filename) + yield + print("removing schema fields test data") + delete_urns_from_file(filename) + wait_for_writes_to_sync() + finally: + os.remove(filename) + + +@pytest.mark.dependency() +def test_healthchecks(wait_for_healthchecks): + # Call to wait_for_healthchecks fixture will do the actual functionality. + pass + + +def get_gql_query(filename: str) -> str: + with open(filename) as fp: + return fp.read() + + +def validate_schema_field_urn_for_chart( + graph: DataHubGraph, chart_urn: str, upstream_schema_field_urn: str +) -> None: + # Validate listing + result = graph.execute_graphql( + get_gql_query("tests/schema_fields/queries/get_chart_field.gql"), + {"urn": chart_urn}, + ) + assert "chart" in result + assert "inputFields" in result["chart"] + assert len(result["chart"]["inputFields"]["fields"]) == 1 + assert ( + result["chart"]["inputFields"]["fields"][0]["schemaField"]["schemaFieldEntity"][ + "urn" + ] + == upstream_schema_field_urn + ) + assert ( + result["chart"]["inputFields"]["fields"][0]["schemaField"]["schemaFieldEntity"][ + "fieldPath" + ] + == "field1" + ) + assert ( + result["chart"]["inputFields"]["fields"][0]["schemaFieldUrn"] + == upstream_schema_field_urn + ) + assert ( + result["chart"]["inputFields"]["fields"][0]["schemaField"]["schemaFieldEntity"][ + "documentation" + ]["documentations"][0]["documentation"] + == "test documentation" + ) + + +# @tenacity.retry( +# stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec) +# ) +@pytest.mark.dependency(depends=["test_healthchecks"]) +def test_schema_field_gql_mapper_for_charts( + ingest_cleanup_data, chart_urn, upstream_schema_field_urn +): + graph: DataHubGraph = DataHubGraph(config=DatahubClientConfig(server=get_gms_url())) + + validate_schema_field_urn_for_chart(graph, chart_urn, upstream_schema_field_urn)