Skip to content

Commit

Permalink
fix(siblingsHook): check number of dbtUpstreams instead of all upStre…
Browse files Browse the repository at this point in the history
…ams (datahub-project#8817)

Co-authored-by: Ethan Cartwright <[email protected]>
  • Loading branch information
ethan-cartwright and ethan-cartwright authored Sep 13, 2023
1 parent 3cc0f76 commit 785ab77
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,19 @@ private void handleSourceDatasetEvent(MetadataChangeLog event, DatasetUrn source
UpstreamLineage upstreamLineage = getUpstreamLineageFromEvent(event);
if (upstreamLineage != null && upstreamLineage.hasUpstreams()) {
UpstreamArray upstreams = upstreamLineage.getUpstreams();
if (
upstreams.size() == 1
&& upstreams.get(0).getDataset().getPlatformEntity().getPlatformNameEntity().equals(DBT_PLATFORM_NAME)) {
setSiblingsAndSoftDeleteSibling(upstreams.get(0).getDataset(), sourceUrn);

// an entity can have merged lineage (eg. dbt + snowflake), but by default siblings are only between dbt <> non-dbt
UpstreamArray dbtUpstreams = new UpstreamArray(
upstreams.stream()
.filter(obj -> obj.getDataset().getPlatformEntity().getPlatformNameEntity().equals(DBT_PLATFORM_NAME))
.collect(Collectors.toList())
);
// We're assuming a data asset (eg. snowflake table) will only ever be downstream of 1 dbt model
if (dbtUpstreams.size() == 1) {
setSiblingsAndSoftDeleteSibling(dbtUpstreams.get(0).getDataset(), sourceUrn);
} else {
log.error("{} has an unexpected number of dbt upstreams: {}. Not adding any as siblings.", sourceUrn.toString(), dbtUpstreams.size());

}
}
}
Expand All @@ -219,7 +228,7 @@ private void setSiblingsAndSoftDeleteSibling(Urn dbtUrn, Urn sourceUrn) {
existingDbtSiblingAspect != null
&& existingSourceSiblingAspect != null
&& existingDbtSiblingAspect.getSiblings().contains(sourceUrn.toString())
&& existingDbtSiblingAspect.getSiblings().contains(dbtUrn.toString())
&& existingSourceSiblingAspect.getSiblings().contains(dbtUrn.toString())
) {
// we have already connected them- we can abort here
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.net.URISyntaxException;

import static com.linkedin.metadata.Constants.*;
import static org.mockito.ArgumentMatchers.*;

Expand Down Expand Up @@ -78,15 +80,12 @@ public void testInvokeWhenThereIsAPairWithDbtSourceNode() throws Exception {
_mockAuthentication
)).thenReturn(mockResponse);

MetadataChangeLog event = new MetadataChangeLog();
event.setEntityType(DATASET_ENTITY_NAME);
event.setAspectName(UPSTREAM_LINEAGE_ASPECT_NAME);
event.setChangeType(ChangeType.UPSERT);

MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, ChangeType.UPSERT);

Upstream upstream = createUpstream("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)", DatasetLineageType.TRANSFORMED);
final UpstreamLineage upstreamLineage = new UpstreamLineage();
final UpstreamArray upstreamArray = new UpstreamArray();
final Upstream upstream = new Upstream();
upstream.setType(DatasetLineageType.TRANSFORMED);
upstream.setDataset(DatasetUrn.createFromString("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)"));

upstreamArray.add(upstream);
upstreamLineage.setUpstreams(upstreamArray);
Expand Down Expand Up @@ -151,15 +150,11 @@ public void testInvokeWhenThereIsNoPairWithDbtModel() throws Exception {
_mockAuthentication
)).thenReturn(mockResponse);

MetadataChangeLog event = new MetadataChangeLog();
event.setEntityType(DATASET_ENTITY_NAME);
event.setAspectName(UPSTREAM_LINEAGE_ASPECT_NAME);
event.setChangeType(ChangeType.UPSERT);
MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, ChangeType.UPSERT);
Upstream upstream = createUpstream("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)", DatasetLineageType.TRANSFORMED);

final UpstreamLineage upstreamLineage = new UpstreamLineage();
final UpstreamArray upstreamArray = new UpstreamArray();
final Upstream upstream = new Upstream();
upstream.setType(DatasetLineageType.TRANSFORMED);
upstream.setDataset(DatasetUrn.createFromString("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)"));

upstreamArray.add(upstream);
upstreamLineage.setUpstreams(upstreamArray);
Expand Down Expand Up @@ -189,15 +184,11 @@ public void testInvokeWhenThereIsNoPairWithDbtModel() throws Exception {
public void testInvokeWhenThereIsAPairWithBigqueryDownstreamNode() throws Exception {
Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true);

MetadataChangeLog event = new MetadataChangeLog();
event.setEntityType(DATASET_ENTITY_NAME);
event.setAspectName(UPSTREAM_LINEAGE_ASPECT_NAME);
event.setChangeType(ChangeType.UPSERT);

MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, ChangeType.UPSERT);
final UpstreamLineage upstreamLineage = new UpstreamLineage();
final UpstreamArray upstreamArray = new UpstreamArray();
final Upstream upstream = new Upstream();
upstream.setType(DatasetLineageType.TRANSFORMED);
upstream.setDataset(DatasetUrn.createFromString("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.jaffle_shop.customers,PROD)"));
Upstream upstream = createUpstream("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.jaffle_shop.customers,PROD)", DatasetLineageType.TRANSFORMED);

upstreamArray.add(upstream);
upstreamLineage.setUpstreams(upstreamArray);
Expand Down Expand Up @@ -259,10 +250,7 @@ public void testInvokeWhenThereIsAKeyBeingReingested() throws Exception {
.setSkipAggregates(true).setSkipHighlighting(true))
)).thenReturn(returnSearchResult);

MetadataChangeLog event = new MetadataChangeLog();
event.setEntityType(DATASET_ENTITY_NAME);
event.setAspectName(DATASET_KEY_ASPECT_NAME);
event.setChangeType(ChangeType.UPSERT);
MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, DATASET_KEY_ASPECT_NAME, ChangeType.UPSERT);
final DatasetKey datasetKey = new DatasetKey();
datasetKey.setName("my-proj.jaffle_shop.customers");
datasetKey.setOrigin(FabricType.PROD);
Expand Down Expand Up @@ -304,4 +292,76 @@ public void testInvokeWhenThereIsAKeyBeingReingested() throws Exception {
Mockito.eq(_mockAuthentication)
);
}
}
@Test
public void testInvokeWhenSourceUrnHasTwoDbtUpstreams() throws Exception {

MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, ChangeType.UPSERT);
final UpstreamLineage upstreamLineage = new UpstreamLineage();
final UpstreamArray upstreamArray = new UpstreamArray();
Upstream dbtUpstream1 = createUpstream("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.source_entity1,PROD)", DatasetLineageType.TRANSFORMED);
Upstream dbtUpstream2 = createUpstream("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.source_entity2,PROD)", DatasetLineageType.TRANSFORMED);
upstreamArray.add(dbtUpstream1);
upstreamArray.add(dbtUpstream2);
upstreamLineage.setUpstreams(upstreamArray);

event.setAspect(GenericRecordUtils.serializeAspect(upstreamLineage));
event.setEntityUrn(Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)"));
_siblingAssociationHook.invoke(event);


Mockito.verify(_mockEntityClient, Mockito.times(0)).ingestProposal(
Mockito.any(),
Mockito.eq(_mockAuthentication)
);


}

@Test
public void testInvokeWhenSourceUrnHasTwoUpstreamsOneDbt() throws Exception {

MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, ChangeType.UPSERT);
final UpstreamLineage upstreamLineage = new UpstreamLineage();
final UpstreamArray upstreamArray = new UpstreamArray();
Upstream dbtUpstream = createUpstream("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.source_entity1,PROD)", DatasetLineageType.TRANSFORMED);
Upstream snowflakeUpstream =
createUpstream("urn:li:dataset:(urn:li:dataPlatform:snowflake,my-proj.jaffle_shop.customers,PROD)", DatasetLineageType.TRANSFORMED);
upstreamArray.add(dbtUpstream);
upstreamArray.add(snowflakeUpstream);
upstreamLineage.setUpstreams(upstreamArray);

event.setAspect(GenericRecordUtils.serializeAspect(upstreamLineage));
event.setEntityUrn(Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)"));
_siblingAssociationHook.invoke(event);


Mockito.verify(_mockEntityClient, Mockito.times(2)).ingestProposal(
Mockito.any(),
Mockito.eq(_mockAuthentication)
);


}

private MetadataChangeLog createEvent(String entityType, String aspectName, ChangeType changeType) {
MetadataChangeLog event = new MetadataChangeLog();
event.setEntityType(entityType);
event.setAspectName(aspectName);
event.setChangeType(changeType);
return event;
}
private Upstream createUpstream(String urn, DatasetLineageType upstreamType) {

final Upstream upstream = new Upstream();
upstream.setType(upstreamType);
try {
upstream.setDataset(DatasetUrn.createFromString(urn));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}

return upstream;
}


}

0 comments on commit 785ab77

Please sign in to comment.