diff --git a/build.gradle b/build.gradle index ae54de07cb81c7..4ff0626099289a 100644 --- a/build.gradle +++ b/build.gradle @@ -7,6 +7,7 @@ buildscript { ext.springBootVersion = '2.7.14' ext.openTelemetryVersion = '1.18.0' ext.neo4jVersion = '4.4.9' + ext.neo4jApocVersion = '4.4.0.9:all' ext.testContainersVersion = '1.17.4' ext.elasticsearchVersion = '7.10.2' // TODO: Change to final release version once it's out ETA Mid-April @@ -156,6 +157,7 @@ project.ext.externalDependency = [ 'mysqlConnector': 'mysql:mysql-connector-java:8.0.20', 'neo4jHarness': 'org.neo4j.test:neo4j-harness:' + neo4jVersion, 'neo4jJavaDriver': 'org.neo4j.driver:neo4j-java-driver:' + neo4jVersion, + 'neo4jApoc': 'org.neo4j.procedure:apoc:' + neo4jApocVersion, 'opentelemetryApi': 'io.opentelemetry:opentelemetry-api:' + openTelemetryVersion, 'opentelemetryAnnotations': 'io.opentelemetry:opentelemetry-extension-annotations:' + openTelemetryVersion, 'opentracingJdbc':'io.opentracing.contrib:opentracing-jdbc:0.2.15', @@ -214,7 +216,7 @@ project.ext.externalDependency = [ 'common': 'commons-io:commons-io:2.7', 'jline':'jline:jline:1.4.1', 'jetbrains':' org.jetbrains.kotlin:kotlin-stdlib:1.6.0' - + ] allprojects { diff --git a/metadata-io/build.gradle b/metadata-io/build.gradle index 507351f933cf00..b91a501c1f231d 100644 --- a/metadata-io/build.gradle +++ b/metadata-io/build.gradle @@ -52,6 +52,9 @@ dependencies { testCompile externalDependency.h2 testCompile externalDependency.mysqlConnector testCompile externalDependency.neo4jHarness + testCompile (externalDependency.neo4jApoc) { + exclude group: 'org.yaml', module: 'snakeyaml' + } testCompile externalDependency.mockito testCompile externalDependency.mockitoInline testCompile externalDependency.iStackCommons diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java index d36f05cfb039dc..6f63209f9c3801 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java @@ -1,6 +1,7 @@ package com.linkedin.metadata.graph.neo4j; import com.linkedin.common.FabricType; +import com.linkedin.common.UrnArray; import com.linkedin.common.urn.DataPlatformUrn; import com.linkedin.common.urn.DatasetUrn; import com.linkedin.common.urn.TagUrn; @@ -17,6 +18,7 @@ import com.linkedin.metadata.query.filter.RelationshipFilter; import java.util.Arrays; import java.util.Collections; + import org.neo4j.driver.Driver; import org.neo4j.driver.GraphDatabase; import org.testng.SkipException; @@ -29,6 +31,8 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import static com.linkedin.metadata.search.utils.QueryUtils.*; import static org.testng.Assert.assertEquals; @@ -194,11 +198,82 @@ public void testRemoveEdge() throws Exception { assertEquals(result.getTotal(), 0); } + private Set getPathUrnArraysFromLineageResult(EntityLineageResult result) { + return result.getRelationships() + .stream() + .map(x -> x.getPaths().get(0)) + .collect(Collectors.toSet()); + } + + @Test + public void testGetLineage() { + GraphService service = getGraphService(); + + List edges = Arrays.asList( + // d1 <-Consumes- dj1 -Produces-> d2 <-DownstreamOf- d3 <-DownstreamOf- d5 + new Edge(dataJobOneUrn, datasetOneUrn, consumes, 1L, null, 3L, null, null), + new Edge(dataJobOneUrn, datasetTwoUrn, produces, 5L, null, 7L, null, null), + new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, 9L, null, null, null, null), + new Edge(datasetFiveUrn, datasetThreeUrn, downstreamOf, 11L, null, null, null, null), + + // another path between d2 and d5 which is shorter + // d1 <-DownstreamOf- d4 <-DownstreamOf- d5 + new Edge(datasetFourUrn, datasetOneUrn, downstreamOf, 13L, null, 13L, null, null), + new Edge(datasetFiveUrn, datasetFourUrn, downstreamOf, 13L, null, 13L, null, null) + ); + edges.forEach(service::addEdge); + + // simple path finding + final var upstreamLineageDataset3Hop3 = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 3); + assertEquals(upstreamLineageDataset3Hop3.getTotal().intValue(), 3); + assertEquals( + getPathUrnArraysFromLineageResult(upstreamLineageDataset3Hop3), + Set.of( + new UrnArray(datasetThreeUrn, datasetTwoUrn), + new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn), + new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn, datasetOneUrn))); + + // simple path finding + final var upstreamLineageDatasetFiveHop2 = service.getLineage(datasetFiveUrn, LineageDirection.UPSTREAM, 0, 1000, 2); + assertEquals(upstreamLineageDatasetFiveHop2.getTotal().intValue(), 4); + assertEquals( + getPathUrnArraysFromLineageResult(upstreamLineageDatasetFiveHop2), + Set.of( + new UrnArray(datasetFiveUrn, datasetThreeUrn), + new UrnArray(datasetFiveUrn, datasetThreeUrn, datasetTwoUrn), + new UrnArray(datasetFiveUrn, datasetFourUrn), + new UrnArray(datasetFiveUrn, datasetFourUrn, datasetOneUrn))); + + // there are two paths from p5 to p1, one longer and one shorter, and the longer one is discarded from result + final var upstreamLineageDataset5Hop5 = service.getLineage(datasetFiveUrn, LineageDirection.UPSTREAM, 0, 1000, 5); + assertEquals(upstreamLineageDataset5Hop5.getTotal().intValue(), 5); + assertEquals( + getPathUrnArraysFromLineageResult(upstreamLineageDataset5Hop5), + Set.of( + new UrnArray(datasetFiveUrn, datasetThreeUrn), + new UrnArray(datasetFiveUrn, datasetThreeUrn, datasetTwoUrn), + new UrnArray(datasetFiveUrn, datasetThreeUrn, datasetTwoUrn, dataJobOneUrn), + new UrnArray(datasetFiveUrn, datasetFourUrn), + new UrnArray(datasetFiveUrn, datasetFourUrn, datasetOneUrn))); + + // downstream lookup + final var downstreamLineageDataset1Hop2 = service.getLineage(datasetOneUrn, LineageDirection.DOWNSTREAM, 0, 1000, 2); + assertEquals(downstreamLineageDataset1Hop2.getTotal().intValue(), 4); + assertEquals( + getPathUrnArraysFromLineageResult(downstreamLineageDataset1Hop2), + Set.of( + new UrnArray(datasetOneUrn, dataJobOneUrn), + new UrnArray(datasetOneUrn, dataJobOneUrn, datasetTwoUrn), + new UrnArray(datasetOneUrn, datasetFourUrn), + new UrnArray(datasetOneUrn, datasetFourUrn, datasetFiveUrn))); + } + @Test public void testGetLineageTimeFilterQuery() throws Exception { GraphService service = getGraphService(); List edges = Arrays.asList( + // d1 <-Consumes- dj1 -Produces-> d2 <-DownstreamOf- d3 <-DownstreamOf- d4 new Edge(dataJobOneUrn, datasetOneUrn, consumes, 1L, null, 3L, null, null), new Edge(dataJobOneUrn, datasetTwoUrn, produces, 5L, null, 7L, null, null), new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, 9L, null, null, null, null), @@ -206,21 +281,76 @@ public void testGetLineageTimeFilterQuery() throws Exception { ); edges.forEach(service::addEdge); + // no time filtering EntityLineageResult upstreamLineageTwoHops = service.getLineage(datasetFourUrn, LineageDirection.UPSTREAM, 0, 1000, 2); assertEquals(upstreamLineageTwoHops.getTotal().intValue(), 2); assertEquals(upstreamLineageTwoHops.getRelationships().size(), 2); + assertEquals( + getPathUrnArraysFromLineageResult(upstreamLineageTwoHops), + Set.of( + new UrnArray(datasetFourUrn, datasetThreeUrn), + new UrnArray(datasetFourUrn, datasetThreeUrn, datasetTwoUrn))); + // with time filtering EntityLineageResult upstreamLineageTwoHopsWithTimeFilter = service.getLineage(datasetFourUrn, LineageDirection.UPSTREAM, 0, 1000, 2, 10L, 12L); assertEquals(upstreamLineageTwoHopsWithTimeFilter.getTotal().intValue(), 1); assertEquals(upstreamLineageTwoHopsWithTimeFilter.getRelationships().size(), 1); + assertEquals( + getPathUrnArraysFromLineageResult(upstreamLineageTwoHopsWithTimeFilter), + Set.of( + new UrnArray(datasetFourUrn, datasetThreeUrn))); + // with time filtering EntityLineageResult upstreamLineageTimeFilter = service.getLineage(datasetTwoUrn, LineageDirection.UPSTREAM, 0, 1000, 4, 2L, 6L); assertEquals(upstreamLineageTimeFilter.getTotal().intValue(), 2); assertEquals(upstreamLineageTimeFilter.getRelationships().size(), 2); + assertEquals( + getPathUrnArraysFromLineageResult(upstreamLineageTimeFilter), + Set.of( + new UrnArray(datasetTwoUrn, dataJobOneUrn), + new UrnArray(datasetTwoUrn, dataJobOneUrn, datasetOneUrn))); + // with time filtering EntityLineageResult downstreamLineageTimeFilter = service.getLineage(datasetOneUrn, LineageDirection.DOWNSTREAM, 0, 1000, 4, 0L, 4L); assertEquals(downstreamLineageTimeFilter.getTotal().intValue(), 1); assertEquals(downstreamLineageTimeFilter.getRelationships().size(), 1); + assertEquals( + getPathUrnArraysFromLineageResult(downstreamLineageTimeFilter), + Set.of( + new UrnArray(datasetOneUrn, dataJobOneUrn))); + } + + @Test + public void testGetLineageTimeFilteringSkipsShorterButNonMatchingPaths() { + GraphService service = getGraphService(); + List edges = Arrays.asList( + // d1 <-Consumes- dj1 -Produces-> d2 <-DownstreamOf- d3 + new Edge(dataJobOneUrn, datasetOneUrn, consumes, 5L, null, 5L, null, null), + new Edge(dataJobOneUrn, datasetTwoUrn, produces, 7L, null, 7L, null, null), + new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, 9L, null, null, null, null), + + // d1 <-DownstreamOf- d3 (shorter path from d3 to d1, but with very old time) + new Edge(datasetThreeUrn, datasetOneUrn, downstreamOf, 1L, null, 2L, null, null) + ); + edges.forEach(service::addEdge); + + // no time filtering, shorter path from d3 to d1 is returned + EntityLineageResult upstreamLineageNoTimeFiltering = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 3); + assertEquals( + getPathUrnArraysFromLineageResult(upstreamLineageNoTimeFiltering), + Set.of( + new UrnArray(datasetThreeUrn, datasetTwoUrn), + new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn), + new UrnArray(datasetThreeUrn, datasetOneUrn))); + + // with time filtering, shorter path from d3 to d1 is excluded so longer path is returned + EntityLineageResult upstreamLineageTimeFiltering = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 3, 3L, 17L); + assertEquals( + getPathUrnArraysFromLineageResult(upstreamLineageTimeFiltering), + Set.of( + new UrnArray(datasetThreeUrn, datasetTwoUrn), + new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn), + new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn, datasetOneUrn))); } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jTestServerBuilder.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jTestServerBuilder.java index 4d6d15255b9222..ba4e4cec379144 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jTestServerBuilder.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jTestServerBuilder.java @@ -2,6 +2,8 @@ import java.io.File; import java.net.URI; + +import apoc.path.PathExplorer; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.harness.Neo4j; import org.neo4j.harness.Neo4jBuilder; @@ -17,7 +19,9 @@ private Neo4jTestServerBuilder(Neo4jBuilder builder) { } public Neo4jTestServerBuilder() { - this(new InProcessNeo4jBuilder()); + this(new InProcessNeo4jBuilder() + .withProcedure(PathExplorer.class) + ); } public Neo4jTestServerBuilder(File workingDirectory) {