Skip to content

Commit

Permalink
Merge branch 'master' into master+ing-195-sqlglot-lineage-for-tll-in-…
Browse files Browse the repository at this point in the history
…powerbi
  • Loading branch information
siddiquebagwan-gslab authored Aug 9, 2023
2 parents 5e4387f + 104f0f8 commit 78e7944
Show file tree
Hide file tree
Showing 22 changed files with 330 additions and 428 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export function getDataForEntityType<T>({
};
}

if (anyEntityData?.siblings?.siblings?.length > 0 && !isHideSiblingMode) {
if (anyEntityData?.siblings?.siblings?.filter((sibling) => sibling.exists).length > 0 && !isHideSiblingMode) {
const genericSiblingProperties: GenericEntityProperties[] = anyEntityData?.siblings?.siblings?.map((sibling) =>
getDataForEntityType({ data: sibling, getOverrideProperties: () => ({}) }),
);
Expand Down
6 changes: 6 additions & 0 deletions datahub-web-react/src/graphql/lineage.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ fragment lineageFields on EntityWithRelationships {
siblings {
urn
type
... on Dataset {
exists
}
...lineageNodeProperties
}
}
Expand Down Expand Up @@ -369,6 +372,9 @@ query getEntityLineage(
siblings {
urn
type
... on Dataset {
exists
}
...lineageNodeProperties
}
}
Expand Down
7 changes: 7 additions & 0 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ reporting:
- type: datahub
config:
report_recipe: false

# Optional log to put failed JSONs into a file
# Helpful in case you are trying to debug some issue with specific ingestion failing
failure_log:
enabled: false
log_config:
filename: ./path/to/failure.json
```
#### Deploying and scheduling ingestion to the UI
Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@ def make_lineage_mce(
downstream_urn: str,
lineage_type: str = DatasetLineageTypeClass.TRANSFORMED,
) -> MetadataChangeEventClass:
"""
Note: this function only supports lineage for dataset aspects. It will not
update lineage for any other aspect types.
"""
mce = MetadataChangeEventClass(
proposedSnapshot=DatasetSnapshotClass(
urn=downstream_urn,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,40 @@
package com.linkedin.metadata.search;

import com.codahale.metrics.Timer;
import com.linkedin.data.template.LongMap;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.search.cache.CachingAllEntitiesSearchAggregator;
import com.linkedin.metadata.search.cache.EntityDocCountCache;
import com.linkedin.metadata.search.client.CachingEntitySearchService;
import com.linkedin.metadata.search.ranker.SearchRanker;
import com.linkedin.metadata.utils.SearchUtil;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.metadata.utils.SearchUtil.*;


@Slf4j
public class SearchService {
private final CachingEntitySearchService _cachingEntitySearchService;
private final CachingAllEntitiesSearchAggregator _cachingAllEntitiesSearchAggregator;
private final EntityDocCountCache _entityDocCountCache;
private final SearchRanker _searchRanker;

public SearchService(
EntityDocCountCache entityDocCountCache,
CachingEntitySearchService cachingEntitySearchService,
CachingAllEntitiesSearchAggregator cachingEntitySearchAggregator,
SearchRanker searchRanker) {
_cachingEntitySearchService = cachingEntitySearchService;
_cachingAllEntitiesSearchAggregator = cachingEntitySearchAggregator;
_searchRanker = searchRanker;
_entityDocCountCache = entityDocCountCache;
}
Expand All @@ -44,7 +49,7 @@ public Map<String, Long> docCountPerEntity(@Nonnull List<String> entityNames) {
* Gets a list of documents that match given search request. The results are aggregated and filters are applied to the
* search hits and not the aggregation results.
*
* @param entityNames names of the entity
* @param entityNames names of the entities
* @param input the search input text
* @param postFilters the request map with fields and values as filters to be applied to search hits
* @param sortCriterion {@link SortCriterion} to be applied to search results
Expand All @@ -56,8 +61,13 @@ public Map<String, Long> docCountPerEntity(@Nonnull List<String> entityNames) {
@Nonnull
public SearchResult search(@Nonnull List<String> entityNames, @Nonnull String input, @Nullable Filter postFilters,
@Nullable SortCriterion sortCriterion, int from, int size, @Nullable SearchFlags searchFlags) {
List<String> entitiesToSearch = getEntitiesToSearch(entityNames);
if (entitiesToSearch.isEmpty()) {
// Optimization: If the indices are all empty, return empty result
return getEmptySearchResult(from, size);
}
SearchResult result =
_cachingEntitySearchService.search(entityNames, input, postFilters, sortCriterion, from, size, searchFlags, null);
_cachingEntitySearchService.search(entitiesToSearch, input, postFilters, sortCriterion, from, size, searchFlags, null);

try {
return result.copy().setEntities(new SearchEntityArray(_searchRanker.rank(result.getEntities())));
Expand Down Expand Up @@ -95,7 +105,58 @@ public SearchResult searchAcrossEntities(@Nonnull List<String> entities, @Nonnul
log.debug(String.format(
"Searching Search documents entities: %s, input: %s, postFilters: %s, sortCriterion: %s, from: %s, size: %s",
entities, input, postFilters, sortCriterion, from, size));
return _cachingAllEntitiesSearchAggregator.getSearchResults(entities, input, postFilters, sortCriterion, from, size, searchFlags, facets);
// DEPRECATED
// This is the legacy version of `_entityType`-- it operates as a special case and does not support ORs, Unions, etc.
// We will still provide it for backwards compatibility but when sending filters to the backend use the new
// filter name `_entityType` that we provide above. This is just provided to prevent a breaking change for old clients.
boolean aggregateByLegacyEntityFacet = facets != null && facets.contains("entity");
if (aggregateByLegacyEntityFacet) {
facets = new ArrayList<>(facets);
facets.add(INDEX_VIRTUAL_FIELD);
}
List<String> nonEmptyEntities = getEntitiesToSearch(entities);
if (nonEmptyEntities.isEmpty()) {
// Optimization: If the indices are all empty, return empty result
return getEmptySearchResult(from, size);
}
SearchResult result = _cachingEntitySearchService.search(nonEmptyEntities, input, postFilters, sortCriterion, from, size, searchFlags, facets);
if (facets == null || facets.contains("entity") || facets.contains("_entityType")) {
Optional<AggregationMetadata> entityTypeAgg = result.getMetadata().getAggregations().stream().filter(
aggMeta -> aggMeta.getName().equals(INDEX_VIRTUAL_FIELD)).findFirst();
if (entityTypeAgg.isPresent()) {
LongMap numResultsPerEntity = entityTypeAgg.get().getAggregations();
result.getMetadata()
.getAggregations()
.add(new AggregationMetadata().setName("entity")
.setDisplayName("Type")
.setAggregations(numResultsPerEntity)
.setFilterValues(new FilterValueArray(SearchUtil.convertToFilters(numResultsPerEntity, Collections.emptySet()))));
} else {
// Should not happen due to the adding of the _entityType aggregation before, but if it does, best-effort count of entity types
// Will not include entity types that had 0 results
Map<String, Long> numResultsPerEntity = result.getEntities().stream().collect(Collectors.groupingBy(
entity -> entity.getEntity().getEntityType(), Collectors.counting()));
result.getMetadata()
.getAggregations()
.add(new AggregationMetadata().setName("entity")
.setDisplayName("Type")
.setAggregations(new LongMap(numResultsPerEntity))
.setFilterValues(new FilterValueArray(SearchUtil.convertToFilters(numResultsPerEntity, Collections.emptySet()))));
}
}
return result;
}

private List<String> getEntitiesToSearch(@Nonnull List<String> inputEntities) {
List<String> nonEmptyEntities;
List<String> lowercaseEntities = inputEntities.stream().map(String::toLowerCase).collect(Collectors.toList());
try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "getNonEmptyEntities").time()) {
nonEmptyEntities = _entityDocCountCache.getNonEmptyEntities();
}
if (!inputEntities.isEmpty()) {
nonEmptyEntities = nonEmptyEntities.stream().filter(lowercaseEntities::contains).collect(Collectors.toList());
}
return nonEmptyEntities;
}

/**
Expand All @@ -118,6 +179,26 @@ public ScrollResult scrollAcrossEntities(@Nonnull List<String> entities, @Nonnul
log.debug(String.format(
"Searching Search documents entities: %s, input: %s, postFilters: %s, sortCriterion: %s, from: %s, size: %s",
entities, input, postFilters, sortCriterion, scrollId, size));
return _cachingEntitySearchService.scroll(entities, input, postFilters, sortCriterion, scrollId, keepAlive, size, searchFlags);
List<String> entitiesToSearch = getEntitiesToSearch(entities);
if (entitiesToSearch.isEmpty()) {
// No indices with non-zero entries: skip querying and return empty result
return getEmptyScrollResult(size);
}
return _cachingEntitySearchService.scroll(entitiesToSearch, input, postFilters, sortCriterion, scrollId, keepAlive, size, searchFlags);
}

private static SearchResult getEmptySearchResult(int from, int size) {
return new SearchResult().setEntities(new SearchEntityArray())
.setNumEntities(0)
.setFrom(from)
.setPageSize(size)
.setMetadata(new SearchResultMetadata().setAggregations(new AggregationMetadataArray()));
}

private static ScrollResult getEmptyScrollResult(int size) {
return new ScrollResult().setEntities(new SearchEntityArray())
.setNumEntities(0)
.setPageSize(size)
.setMetadata(new SearchResultMetadata().setAggregations(new AggregationMetadataArray()));
}
}
Loading

0 comments on commit 78e7944

Please sign in to comment.