Skip to content

Commit

Permalink
fix(metadata-io): add unit tests for getLineage queries
Browse files Browse the repository at this point in the history
  • Loading branch information
lix-mms committed Aug 21, 2023
1 parent 23a9d05 commit d01fa6c
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 2 deletions.
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ buildscript {
ext.springBootVersion = '2.7.14'
ext.openTelemetryVersion = '1.18.0'
ext.neo4jVersion = '4.4.9'
ext.neo4jApocVersion = '4.4.0.9:all'
ext.testContainersVersion = '1.17.4'
ext.elasticsearchVersion = '7.10.2'
// TODO: Change to final release version once it's out ETA Mid-April
Expand Down Expand Up @@ -156,6 +157,7 @@ project.ext.externalDependency = [
'mysqlConnector': 'mysql:mysql-connector-java:8.0.20',
'neo4jHarness': 'org.neo4j.test:neo4j-harness:' + neo4jVersion,
'neo4jJavaDriver': 'org.neo4j.driver:neo4j-java-driver:' + neo4jVersion,
'neo4jApoc': 'org.neo4j.procedure:apoc:' + neo4jApocVersion,
'opentelemetryApi': 'io.opentelemetry:opentelemetry-api:' + openTelemetryVersion,
'opentelemetryAnnotations': 'io.opentelemetry:opentelemetry-extension-annotations:' + openTelemetryVersion,
'opentracingJdbc':'io.opentracing.contrib:opentracing-jdbc:0.2.15',
Expand Down Expand Up @@ -214,7 +216,7 @@ project.ext.externalDependency = [
'common': 'commons-io:commons-io:2.7',
'jline':'jline:jline:1.4.1',
'jetbrains':' org.jetbrains.kotlin:kotlin-stdlib:1.6.0'

]

allprojects {
Expand Down
3 changes: 3 additions & 0 deletions metadata-io/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ dependencies {
testCompile externalDependency.h2
testCompile externalDependency.mysqlConnector
testCompile externalDependency.neo4jHarness
testCompile (externalDependency.neo4jApoc) {
exclude group: 'org.yaml', module: 'snakeyaml'
}
testCompile externalDependency.mockito
testCompile externalDependency.mockitoInline
testCompile externalDependency.iStackCommons
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.metadata.graph.neo4j;

import com.linkedin.common.FabricType;
import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.TagUrn;
Expand All @@ -17,6 +18,7 @@
import com.linkedin.metadata.query.filter.RelationshipFilter;
import java.util.Arrays;
import java.util.Collections;

import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.testng.SkipException;
Expand All @@ -29,6 +31,8 @@
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static com.linkedin.metadata.search.utils.QueryUtils.*;
import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -194,33 +198,159 @@ public void testRemoveEdge() throws Exception {
assertEquals(result.getTotal(), 0);
}

private Set<UrnArray> getPathUrnArraysFromLineageResult(EntityLineageResult result) {
return result.getRelationships()
.stream()
.map(x -> x.getPaths().get(0))
.collect(Collectors.toSet());
}

@Test
public void testGetLineage() {
GraphService service = getGraphService();

List<Edge> edges = Arrays.asList(
// d1 <-Consumes- dj1 -Produces-> d2 <-DownstreamOf- d3 <-DownstreamOf- d5
new Edge(dataJobOneUrn, datasetOneUrn, consumes, 1L, null, 3L, null, null),
new Edge(dataJobOneUrn, datasetTwoUrn, produces, 5L, null, 7L, null, null),
new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, 9L, null, null, null, null),
new Edge(datasetFiveUrn, datasetThreeUrn, downstreamOf, 11L, null, null, null, null),

// another path between d2 and d5 which is shorter
// d1 <-DownstreamOf- d4 <-DownstreamOf- d5
new Edge(datasetFourUrn, datasetOneUrn, downstreamOf, 13L, null, 13L, null, null),
new Edge(datasetFiveUrn, datasetFourUrn, downstreamOf, 13L, null, 13L, null, null)
);
edges.forEach(service::addEdge);

// simple path finding
final var upstreamLineageDataset3Hop3 = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 3);
assertEquals(upstreamLineageDataset3Hop3.getTotal().intValue(), 3);
assertEquals(
getPathUrnArraysFromLineageResult(upstreamLineageDataset3Hop3),
Set.of(
new UrnArray(datasetThreeUrn, datasetTwoUrn),
new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn),
new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn, datasetOneUrn)));

// simple path finding
final var upstreamLineageDatasetFiveHop2 = service.getLineage(datasetFiveUrn, LineageDirection.UPSTREAM, 0, 1000, 2);
assertEquals(upstreamLineageDatasetFiveHop2.getTotal().intValue(), 4);
assertEquals(
getPathUrnArraysFromLineageResult(upstreamLineageDatasetFiveHop2),
Set.of(
new UrnArray(datasetFiveUrn, datasetThreeUrn),
new UrnArray(datasetFiveUrn, datasetThreeUrn, datasetTwoUrn),
new UrnArray(datasetFiveUrn, datasetFourUrn),
new UrnArray(datasetFiveUrn, datasetFourUrn, datasetOneUrn)));

// there are two paths from p5 to p1, one longer and one shorter, and the longer one is discarded from result
final var upstreamLineageDataset5Hop5 = service.getLineage(datasetFiveUrn, LineageDirection.UPSTREAM, 0, 1000, 5);
assertEquals(upstreamLineageDataset5Hop5.getTotal().intValue(), 5);
assertEquals(
getPathUrnArraysFromLineageResult(upstreamLineageDataset5Hop5),
Set.of(
new UrnArray(datasetFiveUrn, datasetThreeUrn),
new UrnArray(datasetFiveUrn, datasetThreeUrn, datasetTwoUrn),
new UrnArray(datasetFiveUrn, datasetThreeUrn, datasetTwoUrn, dataJobOneUrn),
new UrnArray(datasetFiveUrn, datasetFourUrn),
new UrnArray(datasetFiveUrn, datasetFourUrn, datasetOneUrn)));

// downstream lookup
final var downstreamLineageDataset1Hop2 = service.getLineage(datasetOneUrn, LineageDirection.DOWNSTREAM, 0, 1000, 2);
assertEquals(downstreamLineageDataset1Hop2.getTotal().intValue(), 4);
assertEquals(
getPathUrnArraysFromLineageResult(downstreamLineageDataset1Hop2),
Set.of(
new UrnArray(datasetOneUrn, dataJobOneUrn),
new UrnArray(datasetOneUrn, dataJobOneUrn, datasetTwoUrn),
new UrnArray(datasetOneUrn, datasetFourUrn),
new UrnArray(datasetOneUrn, datasetFourUrn, datasetFiveUrn)));
}

@Test
public void testGetLineageTimeFilterQuery() throws Exception {
GraphService service = getGraphService();

List<Edge> edges = Arrays.asList(
// d1 <-Consumes- dj1 -Produces-> d2 <-DownstreamOf- d3 <-DownstreamOf- d4
new Edge(dataJobOneUrn, datasetOneUrn, consumes, 1L, null, 3L, null, null),
new Edge(dataJobOneUrn, datasetTwoUrn, produces, 5L, null, 7L, null, null),
new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, 9L, null, null, null, null),
new Edge(datasetFourUrn, datasetThreeUrn, downstreamOf, 11L, null, null, null, null)
);
edges.forEach(service::addEdge);

// no time filtering
EntityLineageResult upstreamLineageTwoHops = service.getLineage(datasetFourUrn, LineageDirection.UPSTREAM, 0, 1000, 2);
assertEquals(upstreamLineageTwoHops.getTotal().intValue(), 2);
assertEquals(upstreamLineageTwoHops.getRelationships().size(), 2);
assertEquals(
getPathUrnArraysFromLineageResult(upstreamLineageTwoHops),
Set.of(
new UrnArray(datasetFourUrn, datasetThreeUrn),
new UrnArray(datasetFourUrn, datasetThreeUrn, datasetTwoUrn)));

// with time filtering
EntityLineageResult upstreamLineageTwoHopsWithTimeFilter = service.getLineage(datasetFourUrn, LineageDirection.UPSTREAM, 0, 1000, 2, 10L, 12L);
assertEquals(upstreamLineageTwoHopsWithTimeFilter.getTotal().intValue(), 1);
assertEquals(upstreamLineageTwoHopsWithTimeFilter.getRelationships().size(), 1);
assertEquals(
getPathUrnArraysFromLineageResult(upstreamLineageTwoHopsWithTimeFilter),
Set.of(
new UrnArray(datasetFourUrn, datasetThreeUrn)));

// with time filtering
EntityLineageResult upstreamLineageTimeFilter = service.getLineage(datasetTwoUrn, LineageDirection.UPSTREAM, 0, 1000, 4, 2L, 6L);
assertEquals(upstreamLineageTimeFilter.getTotal().intValue(), 2);
assertEquals(upstreamLineageTimeFilter.getRelationships().size(), 2);
assertEquals(
getPathUrnArraysFromLineageResult(upstreamLineageTimeFilter),
Set.of(
new UrnArray(datasetTwoUrn, dataJobOneUrn),
new UrnArray(datasetTwoUrn, dataJobOneUrn, datasetOneUrn)));

// with time filtering
EntityLineageResult downstreamLineageTimeFilter = service.getLineage(datasetOneUrn, LineageDirection.DOWNSTREAM, 0, 1000, 4, 0L, 4L);
assertEquals(downstreamLineageTimeFilter.getTotal().intValue(), 1);
assertEquals(downstreamLineageTimeFilter.getRelationships().size(), 1);
assertEquals(
getPathUrnArraysFromLineageResult(downstreamLineageTimeFilter),
Set.of(
new UrnArray(datasetOneUrn, dataJobOneUrn)));
}

@Test
public void testGetLineageTimeFilteringSkipsShorterButNonMatchingPaths() {
GraphService service = getGraphService();

List<Edge> edges = Arrays.asList(
// d1 <-Consumes- dj1 -Produces-> d2 <-DownstreamOf- d3
new Edge(dataJobOneUrn, datasetOneUrn, consumes, 5L, null, 5L, null, null),
new Edge(dataJobOneUrn, datasetTwoUrn, produces, 7L, null, 7L, null, null),
new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, 9L, null, null, null, null),

// d1 <-DownstreamOf- d3 (shorter path from d3 to d1, but with very old time)
new Edge(datasetThreeUrn, datasetOneUrn, downstreamOf, 1L, null, 2L, null, null)
);
edges.forEach(service::addEdge);

// no time filtering, shorter path from d3 to d1 is returned
EntityLineageResult upstreamLineageNoTimeFiltering = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 3);
assertEquals(
getPathUrnArraysFromLineageResult(upstreamLineageNoTimeFiltering),
Set.of(
new UrnArray(datasetThreeUrn, datasetTwoUrn),
new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn),
new UrnArray(datasetThreeUrn, datasetOneUrn)));

// with time filtering, shorter path from d3 to d1 is excluded so longer path is returned
EntityLineageResult upstreamLineageTimeFiltering = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 3, 3L, 17L);
assertEquals(
getPathUrnArraysFromLineageResult(upstreamLineageTimeFiltering),
Set.of(
new UrnArray(datasetThreeUrn, datasetTwoUrn),
new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn),
new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn, datasetOneUrn)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import java.io.File;
import java.net.URI;

import apoc.path.PathExplorer;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.harness.Neo4j;
import org.neo4j.harness.Neo4jBuilder;
Expand All @@ -17,7 +19,9 @@ private Neo4jTestServerBuilder(Neo4jBuilder builder) {
}

public Neo4jTestServerBuilder() {
this(new InProcessNeo4jBuilder());
this(new InProcessNeo4jBuilder()
.withProcedure(PathExplorer.class)
);
}

public Neo4jTestServerBuilder(File workingDirectory) {
Expand Down

0 comments on commit d01fa6c

Please sign in to comment.