Skip to content

Commit

Permalink
feat(platform): add support for via nodes (#9733)
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka authored Jan 30, 2024
1 parent f378fb6 commit 1d06d38
Show file tree
Hide file tree
Showing 60 changed files with 2,401 additions and 292 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,8 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
"scrollAcrossEntities",
new ScrollAcrossEntitiesResolver(this.entityClient, this.viewService))
.dataFetcher(
"searchAcrossLineage", new SearchAcrossLineageResolver(this.entityClient))
"searchAcrossLineage",
new SearchAcrossLineageResolver(this.entityClient, this.entityRegistry))
.dataFetcher(
"scrollAcrossLineage", new ScrollAcrossLineageResolver(this.entityClient))
.dataFetcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ public static CompletableFuture<List<Entity>> batchLoadEntitiesOfSameType(
.filter(entity -> entities.get(0).getClass().isAssignableFrom(entity.objectClass()))
.collect(Collectors.toList()));

final DataLoader loader = dataLoaderRegistry.getDataLoader(filteredEntity.name());
List keyList = new ArrayList();
final DataLoader<Object, Entity> loader =
dataLoaderRegistry.getDataLoader(filteredEntity.name());
List<Object> keyList = new ArrayList();
for (Entity entity : entities) {
keyList.add(filteredEntity.getKeyProvider().apply(entity));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
import static com.linkedin.datahub.graphql.resolvers.search.SearchUtils.*;
import static com.linkedin.metadata.Constants.QUERY_ENTITY_NAME;

import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
Expand All @@ -14,31 +16,63 @@
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper;
import com.linkedin.datahub.graphql.types.mappers.UrnSearchAcrossLineageResultsMapper;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.search.LineageSearchResult;
import com.linkedin.r2.RemoteInvocationException;
import graphql.VisibleForTesting;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/** Resolver responsible for resolving 'searchAcrossEntities' field of the Query type */
@Slf4j
@RequiredArgsConstructor
public class SearchAcrossLineageResolver
implements DataFetcher<CompletableFuture<SearchAcrossLineageResults>> {

private static final int DEFAULT_START = 0;
private static final int DEFAULT_COUNT = 10;

private static final Set<String> TRANSIENT_ENTITIES = ImmutableSet.of(QUERY_ENTITY_NAME);

private final EntityClient _entityClient;

private final EntityRegistry _entityRegistry;

@VisibleForTesting final Set<String> _allEntities;
private final List<String> _allowedEntities;

public SearchAcrossLineageResolver(EntityClient entityClient, EntityRegistry entityRegistry) {
this._entityClient = entityClient;
this._entityRegistry = entityRegistry;
this._allEntities =
entityRegistry.getEntitySpecs().values().stream()
.map(EntitySpec::getName)
.collect(Collectors.toSet());

this._allowedEntities =
this._allEntities.stream()
.filter(e -> !TRANSIENT_ENTITIES.contains(e))
.collect(Collectors.toList());
}

private List<String> getEntityNamesFromInput(List<EntityType> inputTypes) {
if (inputTypes != null && !inputTypes.isEmpty()) {
return inputTypes.stream().map(EntityTypeMapper::getName).collect(Collectors.toList());
} else {
return this._allowedEntities;
}
}

@Override
public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment environment)
throws URISyntaxException {
Expand All @@ -50,12 +84,7 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment

final LineageDirection lineageDirection = input.getDirection();

List<EntityType> entityTypes =
(input.getTypes() == null || input.getTypes().isEmpty())
? SEARCHABLE_ENTITY_TYPES
: input.getTypes();
List<String> entityNames =
entityTypes.stream().map(EntityTypeMapper::getName).collect(Collectors.toList());
List<String> entityNames = getEntityNamesFromInput(input.getTypes());

// escape forward slash since it is a reserved character in Elasticsearch
final String sanitizedQuery =
Expand Down Expand Up @@ -99,8 +128,7 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
} else {
searchFlags = new SearchFlags().setFulltext(true).setSkipHighlighting(true);
}

return UrnSearchAcrossLineageResultsMapper.map(
LineageSearchResult salResults =
_entityClient.searchAcrossLineage(
urn,
resolvedDirection,
Expand All @@ -114,7 +142,9 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
startTimeMillis,
endTimeMillis,
searchFlags,
ResolverUtils.getAuthentication(environment)));
getAuthentication(environment));

return UrnSearchAcrossLineageResultsMapper.map(salResults);
} catch (RemoteInvocationException e) {
log.error(
"Failed to execute search across relationships: source urn {}, direction {}, entity types {}, query {}, filters: {}, start: {}, count: {}",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.datahub.graphql.resolvers.search;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;
import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.search.utils.SearchUtils.applyDefaultSearchFlags;

import com.linkedin.datahub.graphql.generated.SearchInput;
Expand All @@ -10,6 +11,9 @@
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper;
import com.linkedin.datahub.graphql.types.mappers.UrnSearchResultsMapper;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.query.GroupingCriterion;
import com.linkedin.metadata.query.GroupingCriterionArray;
import com.linkedin.metadata.query.GroupingSpec;
import com.linkedin.metadata.query.SearchFlags;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
Expand All @@ -28,7 +32,14 @@ public class SearchResolver implements DataFetcher<CompletableFuture<SearchResul
.setMaxAggValues(20)
.setSkipCache(false)
.setSkipAggregates(false)
.setSkipHighlighting(false);
.setSkipHighlighting(false)
.setGroupingSpec(
new GroupingSpec()
.setGroupingCriteria(
new GroupingCriterionArray(
new GroupingCriterion()
.setBaseEntityType(SCHEMA_FIELD_ENTITY_NAME)
.setGroupingEntityType(DATASET_ENTITY_NAME))));
private static final int DEFAULT_START = 0;
private static final int DEFAULT_COUNT = 10;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.datahub.graphql.types.common.mappers;

import com.linkedin.data.template.SetMode;
import com.linkedin.datahub.graphql.generated.GroupingCriterion;
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import javax.annotation.Nonnull;

public class GroupingCriterionInputMapper
implements ModelMapper<GroupingCriterion, com.linkedin.metadata.query.GroupingCriterion> {

public static final GroupingCriterionInputMapper INSTANCE = new GroupingCriterionInputMapper();

public static com.linkedin.metadata.query.GroupingCriterion map(
@Nonnull final GroupingCriterion groupingCriterion) {
return INSTANCE.apply(groupingCriterion);
}

@Override
public com.linkedin.metadata.query.GroupingCriterion apply(GroupingCriterion input) {
return new com.linkedin.metadata.query.GroupingCriterion()
.setBaseEntityType(
input.getBaseEntityType() != null
? EntityTypeMapper.getName(input.getBaseEntityType())
: null,
SetMode.REMOVE_OPTIONAL_IF_NULL)
.setGroupingEntityType(EntityTypeMapper.getName(input.getGroupingEntityType()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.linkedin.datahub.graphql.generated.SearchFlags;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import com.linkedin.metadata.query.GroupingCriterionArray;
import com.linkedin.metadata.query.GroupingSpec;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

/**
Expand Down Expand Up @@ -42,6 +45,16 @@ public com.linkedin.metadata.query.SearchFlags apply(@Nonnull final SearchFlags
if (searchFlags.getGetSuggestions() != null) {
result.setGetSuggestions(searchFlags.getGetSuggestions());
}
if (searchFlags.getGroupingSpec() != null
&& searchFlags.getGroupingSpec().getGroupingCriteria() != null) {
result.setGroupingSpec(
new GroupingSpec()
.setGroupingCriteria(
new GroupingCriterionArray(
searchFlags.getGroupingSpec().getGroupingCriteria().stream()
.map(GroupingCriterionInputMapper::map)
.collect(Collectors.toList()))));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.linkedin.datahub.graphql.generated.MLPrimaryKey;
import com.linkedin.datahub.graphql.generated.Notebook;
import com.linkedin.datahub.graphql.generated.OwnershipTypeEntity;
import com.linkedin.datahub.graphql.generated.QueryEntity;
import com.linkedin.datahub.graphql.generated.Role;
import com.linkedin.datahub.graphql.generated.SchemaFieldEntity;
import com.linkedin.datahub.graphql.generated.StructuredPropertyEntity;
Expand Down Expand Up @@ -198,6 +199,11 @@ public Entity apply(Urn input) {
((StructuredPropertyEntity) partialEntity).setUrn(input.toString());
((StructuredPropertyEntity) partialEntity).setType(EntityType.STRUCTURED_PROPERTY);
}
if (input.getEntityType().equals(QUERY_ENTITY_NAME)) {
partialEntity = new QueryEntity();
((QueryEntity) partialEntity).setUrn(input.toString());
((QueryEntity) partialEntity).setType(EntityType.QUERY);
}
return partialEntity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ private SearchAcrossLineageResult mapResult(LineageSearchEntity searchEntity) {
.setMatchedFields(getMatchedFieldEntry(searchEntity.getMatchedFields()))
.setPaths(searchEntity.getPaths().stream().map(this::mapPath).collect(Collectors.toList()))
.setDegree(searchEntity.getDegree())
.setDegrees(searchEntity.getDegrees().stream().collect(Collectors.toList()))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
public class QueryType
implements com.linkedin.datahub.graphql.types.EntityType<QueryEntity, String> {
Expand Down Expand Up @@ -50,6 +52,7 @@ public List<DataFetcherResult<QueryEntity>> batchLoad(
final List<Urn> viewUrns = urns.stream().map(UrnUtils::getUrn).collect(Collectors.toList());

try {
log.debug("Fetching query entities: {}", viewUrns);
final Map<Urn, EntityResponse> entities =
_entityClient.batchGetV2(
QUERY_ENTITY_NAME,
Expand Down
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -10948,6 +10948,11 @@ enum QuerySource {
The query was provided manually, e.g. from the UI.
"""
MANUAL

"""
The query was extracted by the system, e.g. from a dashboard.
"""
SYSTEM
}

"""
Expand Down
49 changes: 49 additions & 0 deletions datahub-graphql-core/src/main/resources/search.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,15 @@ input SearchFlags {
Whether to request for search suggestions on the _entityName virtualized field
"""
getSuggestions: Boolean

"""
Additional grouping specifications to apply to the search results
Grouping specifications will control how search results are grouped together
in the response. This is currently being used to group schema fields (columns)
as datasets, and in the future will be used to group other entities as well.
Note: This is an experimental feature and is subject to change.
"""
groupingSpec: GroupingSpec
}

"""
Expand Down Expand Up @@ -278,6 +287,7 @@ input ScrollAcrossEntitiesInput {
searchFlags: SearchFlags
}


"""
Input arguments for a search query over the results of a multi-hop graph query
"""
Expand Down Expand Up @@ -669,6 +679,12 @@ type SearchAcrossLineageResult {
Degree of relationship (number of hops to get to entity)
"""
degree: Int!

"""
Degrees of relationship (for entities discoverable at multiple degrees)
"""
degrees: [Int!]

}

"""
Expand Down Expand Up @@ -1303,4 +1319,37 @@ input SortCriterion {
The order in which we will be sorting
"""
sortOrder: SortOrder!
}

"""
A grouping specification for search results.
"""
input GroupingSpec {

"""
A list of grouping criteria for grouping search results.
There is no implied order in the grouping criteria.
"""
groupingCriteria: [GroupingCriterion!]

}

"""
A single grouping criterion for grouping search results
"""
input GroupingCriterion {

"""
The base entity type that needs to be grouped
e.g. schemaField
Omitting this field will result in all base entities being grouped into the groupingEntityType.
"""
baseEntityType: EntityType

"""
The type of entity being grouped into
e.g. dataset, domain, etc.
"""
groupingEntityType: EntityType!

}
Loading

0 comments on commit 1d06d38

Please sign in to comment.