Skip to content

Commit

Permalink
fix(search): fix lightning cache enable logic (#8522)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored and yoonhyejin committed Aug 24, 2023
1 parent fe0a602 commit 01baea5
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,14 @@ public LineageSearchResult searchAcrossLineage(@Nonnull Urn sourceUrn, @Nonnull
long numEntities = 0;
String codePath = null;
try {
if (canDoLightning(lineageRelationships, input, inputFilters, sortCriterion)) {
Filter reducedFilters =
SearchUtils.removeCriteria(inputFilters, criterion -> criterion.getField().equals(DEGREE_FILTER_INPUT));

if (canDoLightning(lineageRelationships, input, reducedFilters, sortCriterion)) {
codePath = "lightning";
// use lightning approach to return lineage search results
LineageSearchResult lineageSearchResult = getLightningSearchResult(lineageRelationships,
inputFilters, from, size, new HashSet<>(entities));
reducedFilters, from, size, new HashSet<>(entities));
if (!lineageSearchResult.getEntities().isEmpty()) {
log.debug("Lightning Lineage entity result: {}", lineageSearchResult.getEntities().get(0).toString());
}
Expand All @@ -202,7 +205,7 @@ public LineageSearchResult searchAcrossLineage(@Nonnull Urn sourceUrn, @Nonnull
} else {
codePath = "tortoise";
LineageSearchResult lineageSearchResult = getSearchResultInBatches(lineageRelationships, input,
inputFilters, sortCriterion, from, size, finalFlags);
reducedFilters, sortCriterion, from, size, finalFlags);
if (!lineageSearchResult.getEntities().isEmpty()) {
log.debug("Lineage entity result: {}", lineageSearchResult.getEntities().get(0).toString());
}
Expand Down Expand Up @@ -513,15 +516,13 @@ private Filter buildFilter(@Nonnull Set<Urn> urns, @Nullable Filter inputFilters
if (inputFilters == null) {
return QueryUtils.newFilter(urnMatchCriterion);
}
Filter reducedFilters =
SearchUtils.removeCriteria(inputFilters, criterion -> criterion.getField().equals(DEGREE_FILTER_INPUT));

// Add urn match criterion to each or clause
if (!CollectionUtils.isEmpty(reducedFilters.getOr())) {
for (ConjunctiveCriterion conjunctiveCriterion : reducedFilters.getOr()) {
if (!CollectionUtils.isEmpty(inputFilters.getOr())) {
for (ConjunctiveCriterion conjunctiveCriterion : inputFilters.getOr()) {
conjunctiveCriterion.getAnd().add(urnMatchCriterion);
}
return reducedFilters;
return inputFilters;
}
return QueryUtils.newFilter(urnMatchCriterion);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,8 @@ public static String readResourceFile(@Nonnull Class clazz, @Nonnull String file
}
}

@Nonnull
public static Filter removeCriteria(@Nonnull Filter originalFilter, Predicate<Criterion> shouldRemove) {
if (originalFilter.getOr() != null) {
public static Filter removeCriteria(@Nullable Filter originalFilter, Predicate<Criterion> shouldRemove) {
if (originalFilter != null && originalFilter.getOr() != null) {
return new Filter().setOr(new ConjunctiveCriterionArray(originalFilter.getOr()
.stream()
.map(criteria -> removeCriteria(criteria, shouldRemove))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private void resetService(boolean withCache, boolean withLightingCache) {
searchLineageCacheConfiguration.setTtlSeconds(600L);
searchLineageCacheConfiguration.setLightningThreshold(withLightingCache ? -1 : 300);

_lineageSearchService = new LineageSearchService(
_lineageSearchService = spy(new LineageSearchService(
new SearchService(
new EntityDocCountCache(_entityRegistry, _elasticSearchService, entityDocCountCacheConfiguration),
cachingEntitySearchService,
Expand All @@ -143,7 +143,7 @@ private void resetService(boolean withCache, boolean withLightingCache) {
100,
true),
new SimpleRanker()),
_graphService, _cacheManager.getCache("test"), withCache, searchLineageCacheConfiguration);
_graphService, _cacheManager.getCache("test"), withCache, searchLineageCacheConfiguration));
}

@BeforeMethod
Expand Down Expand Up @@ -371,16 +371,19 @@ public void testLightningSearchService() throws Exception {
assertEquals(searchResult.getNumEntities().intValue(), 1);
assertEquals(searchResult.getEntities().get(0).getEntity(), urn);
assertEquals(searchResult.getEntities().get(0).getDegree().intValue(), 1);
verify(_lineageSearchService, times(1)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());

searchResult = searchAcrossLineage(QueryUtils.newFilter("degree.keyword", "1"), testStar);
assertEquals(searchResult.getNumEntities().intValue(), 1);
assertEquals(searchResult.getEntities().get(0).getEntity(), urn);
assertEquals(searchResult.getEntities().get(0).getDegree().intValue(), 1);
verify(_lineageSearchService, times(2)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());

searchResult = searchAcrossLineage(QueryUtils.newFilter("degree.keyword", "2"), testStar);
assertEquals(searchResult.getNumEntities().intValue(), 0);
assertEquals(searchResult.getEntities().size(), 0);
clearCache(true);
verify(_lineageSearchService, times(3)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
clearCache(true); // resets spy

Urn urn2 = new TestEntityUrn("test2", "urn2", "VALUE_2");
ObjectNode document2 = JsonNodeFactory.instance.objectNode();
Expand All @@ -394,6 +397,7 @@ public void testLightningSearchService() throws Exception {
searchResult = searchAcrossLineage(null, testStar);
assertEquals(searchResult.getNumEntities().intValue(), 1);
assertEquals(searchResult.getEntities().get(0).getEntity(), urn);
verify(_lineageSearchService, times(1)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
clearCache(true);

when(_graphService.getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
Expand All @@ -402,10 +406,12 @@ public void testLightningSearchService() throws Exception {
searchResult = searchAcrossLineage(null, testStar);
assertEquals(searchResult.getNumEntities().intValue(), 1);
assertEquals(searchResult.getEntities().size(), 1);
verify(_lineageSearchService, times(1)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
clearCache(true);

// Test Cache Behavior
Mockito.reset(_graphService);
reset(_graphService);
reset(_lineageSearchService);

// Case 1: Use the maxHops in the cache.
when(_graphService.getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
Expand All @@ -421,16 +427,18 @@ public void testLightningSearchService() throws Exception {
new SearchFlags().setSkipCache(false));

assertEquals(searchResult.getNumEntities().intValue(), 1);
Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
eq(1000), eq(null), eq(null));
verify(_lineageSearchService, times(1)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());

// Hit the cache on second attempt
searchResult = _lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(ENTITY_NAME),
"*", 1000, null, null, 0, 10, null, null,
new SearchFlags().setSkipCache(false));
assertEquals(searchResult.getNumEntities().intValue(), 1);
Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
eq(1000), eq(null), eq(null));
verify(_lineageSearchService, times(2)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());


// Case 2: Use the start and end time in the cache.
Expand All @@ -447,27 +455,32 @@ public void testLightningSearchService() throws Exception {
new SearchFlags().setSkipCache(false));

assertEquals(searchResult.getNumEntities().intValue(), 1);
Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
eq(1000), eq(0L), eq(1L));
verify(_lineageSearchService, times(3)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());

// Hit the cache on second attempt
searchResult = _lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(ENTITY_NAME),
"*", null, null, null, 0, 10, 0L, 1L,
new SearchFlags().setSkipCache(false));
assertEquals(searchResult.getNumEntities().intValue(), 1);
Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
eq(1000), eq(0L), eq(1L));
verify(_lineageSearchService, times(4)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());

/*
* Test filtering
*/
reset(_lineageSearchService);

// Entity
searchResult =
_lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(DATASET_ENTITY_NAME),
"*", 1000, null, null, 0, 10, null, null,
new SearchFlags().setSkipCache(false));
assertEquals(searchResult.getNumEntities().intValue(), 0);
assertEquals(searchResult.getEntities().size(), 0);
verify(_lineageSearchService, times(1)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());

// Cached
searchResult =
Expand All @@ -476,25 +489,35 @@ public void testLightningSearchService() throws Exception {
new SearchFlags().setSkipCache(false));
Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
eq(1000), eq(0L), eq(1L));
verify(_lineageSearchService, times(2)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
assertEquals(searchResult.getNumEntities().intValue(), 0);
assertEquals(searchResult.getEntities().size(), 0);

// Platform
Filter filter = QueryUtils.newFilter("platform", "urn:li:dataPlatform:kafka");
ConjunctiveCriterionArray conCritArr = new ConjunctiveCriterionArray();
Criterion platform1Crit = new Criterion().setField("platform").setValue("urn:li:dataPlatform:kafka").setCondition(Condition.EQUAL);
CriterionArray critArr = new CriterionArray(ImmutableList.of(platform1Crit));
conCritArr.add(new ConjunctiveCriterion().setAnd(critArr));
Criterion degreeCrit = new Criterion().setField("degree.keyword").setValue("2").setCondition(Condition.EQUAL);
conCritArr.add(new ConjunctiveCriterion().setAnd(new CriterionArray(ImmutableList.of(degreeCrit))));
Filter filter = new Filter().setOr(conCritArr);

searchResult =
_lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(ENTITY_NAME),
"*", 1000, filter, null, 0, 10, null, null,
new SearchFlags().setSkipCache(false));
assertEquals(searchResult.getNumEntities().intValue(), 0);
assertEquals(searchResult.getEntities().size(), 0);
verify(_lineageSearchService, times(3)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());

// Cached
searchResult =
_lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(ENTITY_NAME),
"*", 1000, filter, null, 0, 10, null, null,
new SearchFlags().setSkipCache(false));
Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
eq(1000), eq(0L), eq(1L));
verify(_lineageSearchService, times(4)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
assertEquals(searchResult.getNumEntities().intValue(), 0);
assertEquals(searchResult.getEntities().size(), 0);

Expand All @@ -506,14 +529,16 @@ public void testLightningSearchService() throws Exception {
new SearchFlags().setSkipCache(false));
assertEquals(searchResult.getNumEntities().intValue(), 0);
assertEquals(searchResult.getEntities().size(), 0);
verify(_lineageSearchService, times(5)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());

// Cached
searchResult =
_lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(ENTITY_NAME),
"*", 1000, originFilter, null, 0, 10, null, null,
new SearchFlags().setSkipCache(false));
Mockito.verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
verify(_graphService, times(1)).getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
eq(1000), eq(0L), eq(1L));
verify(_lineageSearchService, times(6)).getLightningSearchResult(any(), any(), anyInt(), anyInt(), anySet());
assertEquals(searchResult.getNumEntities().intValue(), 0);
assertEquals(searchResult.getEntities().size(), 0);

Expand Down Expand Up @@ -749,7 +774,5 @@ public void testCanDoLightning() throws Exception {
size = 10;
filter = new Filter().setOr(conCritArr);
Assert.assertTrue(_lineageSearchService.canDoLightning(lineageRelationships, "*", filter, null));


}
}

0 comments on commit 01baea5

Please sign in to comment.