diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java index 2be719ed263ea..06545ef3525dd 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java @@ -200,10 +200,19 @@ private void handleSourceDatasetEvent(MetadataChangeLog event, DatasetUrn source UpstreamLineage upstreamLineage = getUpstreamLineageFromEvent(event); if (upstreamLineage != null && upstreamLineage.hasUpstreams()) { UpstreamArray upstreams = upstreamLineage.getUpstreams(); - if ( - upstreams.size() == 1 - && upstreams.get(0).getDataset().getPlatformEntity().getPlatformNameEntity().equals(DBT_PLATFORM_NAME)) { - setSiblingsAndSoftDeleteSibling(upstreams.get(0).getDataset(), sourceUrn); + + // an entity can have merged lineage (eg. dbt + snowflake), but by default siblings are only between dbt <> non-dbt + UpstreamArray dbtUpstreams = new UpstreamArray( + upstreams.stream() + .filter(obj -> obj.getDataset().getPlatformEntity().getPlatformNameEntity().equals(DBT_PLATFORM_NAME)) + .collect(Collectors.toList()) + ); + // We're assuming a data asset (eg. snowflake table) will only ever be downstream of 1 dbt model + if (dbtUpstreams.size() == 1) { + setSiblingsAndSoftDeleteSibling(dbtUpstreams.get(0).getDataset(), sourceUrn); + } else { + log.error("{} has an unexpected number of dbt upstreams: {}. Not adding any as siblings.", sourceUrn.toString(), dbtUpstreams.size()); + } } } @@ -219,7 +228,7 @@ private void setSiblingsAndSoftDeleteSibling(Urn dbtUrn, Urn sourceUrn) { existingDbtSiblingAspect != null && existingSourceSiblingAspect != null && existingDbtSiblingAspect.getSiblings().contains(sourceUrn.toString()) - && existingDbtSiblingAspect.getSiblings().contains(dbtUrn.toString()) + && existingSourceSiblingAspect.getSiblings().contains(dbtUrn.toString()) ) { // we have already connected them- we can abort here return; diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHookTest.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHookTest.java index 5fb2cfaaef2d1..78d304d67bfc0 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHookTest.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHookTest.java @@ -36,6 +36,8 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.net.URISyntaxException; + import static com.linkedin.metadata.Constants.*; import static org.mockito.ArgumentMatchers.*; @@ -78,15 +80,12 @@ public void testInvokeWhenThereIsAPairWithDbtSourceNode() throws Exception { _mockAuthentication )).thenReturn(mockResponse); - MetadataChangeLog event = new MetadataChangeLog(); - event.setEntityType(DATASET_ENTITY_NAME); - event.setAspectName(UPSTREAM_LINEAGE_ASPECT_NAME); - event.setChangeType(ChangeType.UPSERT); + + MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, ChangeType.UPSERT); + + Upstream upstream = createUpstream("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)", DatasetLineageType.TRANSFORMED); final UpstreamLineage upstreamLineage = new UpstreamLineage(); final UpstreamArray upstreamArray = new UpstreamArray(); - final Upstream upstream = new Upstream(); - upstream.setType(DatasetLineageType.TRANSFORMED); - upstream.setDataset(DatasetUrn.createFromString("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)")); upstreamArray.add(upstream); upstreamLineage.setUpstreams(upstreamArray); @@ -151,15 +150,11 @@ public void testInvokeWhenThereIsNoPairWithDbtModel() throws Exception { _mockAuthentication )).thenReturn(mockResponse); - MetadataChangeLog event = new MetadataChangeLog(); - event.setEntityType(DATASET_ENTITY_NAME); - event.setAspectName(UPSTREAM_LINEAGE_ASPECT_NAME); - event.setChangeType(ChangeType.UPSERT); + MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, ChangeType.UPSERT); + Upstream upstream = createUpstream("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)", DatasetLineageType.TRANSFORMED); + final UpstreamLineage upstreamLineage = new UpstreamLineage(); final UpstreamArray upstreamArray = new UpstreamArray(); - final Upstream upstream = new Upstream(); - upstream.setType(DatasetLineageType.TRANSFORMED); - upstream.setDataset(DatasetUrn.createFromString("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)")); upstreamArray.add(upstream); upstreamLineage.setUpstreams(upstreamArray); @@ -189,15 +184,11 @@ public void testInvokeWhenThereIsNoPairWithDbtModel() throws Exception { public void testInvokeWhenThereIsAPairWithBigqueryDownstreamNode() throws Exception { Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true); - MetadataChangeLog event = new MetadataChangeLog(); - event.setEntityType(DATASET_ENTITY_NAME); - event.setAspectName(UPSTREAM_LINEAGE_ASPECT_NAME); - event.setChangeType(ChangeType.UPSERT); + + MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, ChangeType.UPSERT); final UpstreamLineage upstreamLineage = new UpstreamLineage(); final UpstreamArray upstreamArray = new UpstreamArray(); - final Upstream upstream = new Upstream(); - upstream.setType(DatasetLineageType.TRANSFORMED); - upstream.setDataset(DatasetUrn.createFromString("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.jaffle_shop.customers,PROD)")); + Upstream upstream = createUpstream("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.jaffle_shop.customers,PROD)", DatasetLineageType.TRANSFORMED); upstreamArray.add(upstream); upstreamLineage.setUpstreams(upstreamArray); @@ -259,10 +250,7 @@ public void testInvokeWhenThereIsAKeyBeingReingested() throws Exception { .setSkipAggregates(true).setSkipHighlighting(true)) )).thenReturn(returnSearchResult); - MetadataChangeLog event = new MetadataChangeLog(); - event.setEntityType(DATASET_ENTITY_NAME); - event.setAspectName(DATASET_KEY_ASPECT_NAME); - event.setChangeType(ChangeType.UPSERT); + MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, DATASET_KEY_ASPECT_NAME, ChangeType.UPSERT); final DatasetKey datasetKey = new DatasetKey(); datasetKey.setName("my-proj.jaffle_shop.customers"); datasetKey.setOrigin(FabricType.PROD); @@ -304,4 +292,76 @@ public void testInvokeWhenThereIsAKeyBeingReingested() throws Exception { Mockito.eq(_mockAuthentication) ); } -} + @Test + public void testInvokeWhenSourceUrnHasTwoDbtUpstreams() throws Exception { + + MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, ChangeType.UPSERT); + final UpstreamLineage upstreamLineage = new UpstreamLineage(); + final UpstreamArray upstreamArray = new UpstreamArray(); + Upstream dbtUpstream1 = createUpstream("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.source_entity1,PROD)", DatasetLineageType.TRANSFORMED); + Upstream dbtUpstream2 = createUpstream("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.source_entity2,PROD)", DatasetLineageType.TRANSFORMED); + upstreamArray.add(dbtUpstream1); + upstreamArray.add(dbtUpstream2); + upstreamLineage.setUpstreams(upstreamArray); + + event.setAspect(GenericRecordUtils.serializeAspect(upstreamLineage)); + event.setEntityUrn(Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)")); + _siblingAssociationHook.invoke(event); + + + Mockito.verify(_mockEntityClient, Mockito.times(0)).ingestProposal( + Mockito.any(), + Mockito.eq(_mockAuthentication) + ); + + + } + + @Test + public void testInvokeWhenSourceUrnHasTwoUpstreamsOneDbt() throws Exception { + + MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, ChangeType.UPSERT); + final UpstreamLineage upstreamLineage = new UpstreamLineage(); + final UpstreamArray upstreamArray = new UpstreamArray(); + Upstream dbtUpstream = createUpstream("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.source_entity1,PROD)", DatasetLineageType.TRANSFORMED); + Upstream snowflakeUpstream = + createUpstream("urn:li:dataset:(urn:li:dataPlatform:snowflake,my-proj.jaffle_shop.customers,PROD)", DatasetLineageType.TRANSFORMED); + upstreamArray.add(dbtUpstream); + upstreamArray.add(snowflakeUpstream); + upstreamLineage.setUpstreams(upstreamArray); + + event.setAspect(GenericRecordUtils.serializeAspect(upstreamLineage)); + event.setEntityUrn(Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)")); + _siblingAssociationHook.invoke(event); + + + Mockito.verify(_mockEntityClient, Mockito.times(2)).ingestProposal( + Mockito.any(), + Mockito.eq(_mockAuthentication) + ); + + + } + + private MetadataChangeLog createEvent(String entityType, String aspectName, ChangeType changeType) { + MetadataChangeLog event = new MetadataChangeLog(); + event.setEntityType(entityType); + event.setAspectName(aspectName); + event.setChangeType(changeType); + return event; + } + private Upstream createUpstream(String urn, DatasetLineageType upstreamType) { + + final Upstream upstream = new Upstream(); + upstream.setType(upstreamType); + try { + upstream.setDataset(DatasetUrn.createFromString(urn)); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + return upstream; + } + + + }