Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Dataquality aspect): Added Data Quality Metrics aspect to emit data quality metrics metadata into Datahub #9265

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ private Constants() {}
public static final String BROWSE_PATH_V2_DELIMITER = "␟";
public static final String VERSION_STAMP_FIELD_NAME = "versionStamp";
public static final String ENTITY_FILTER_NAME = "_entityType";
public static final String DATAQUALITY_SCHEMA_FILE = "dataquality.graphql";

public static final Set<String> DEFAULT_PERSONA_URNS =
ImmutableSet.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.DatasetStatsSummary;
import com.linkedin.datahub.graphql.generated.Deprecation;
import com.linkedin.datahub.graphql.generated.DimensionNameEntity;
import com.linkedin.datahub.graphql.generated.Domain;
import com.linkedin.datahub.graphql.generated.ERModelRelationship;
import com.linkedin.datahub.graphql.generated.ERModelRelationshipProperties;
Expand All @@ -80,6 +81,7 @@
import com.linkedin.datahub.graphql.generated.LineageRelationship;
import com.linkedin.datahub.graphql.generated.ListAccessTokenResult;
import com.linkedin.datahub.graphql.generated.ListBusinessAttributesResult;
import com.linkedin.datahub.graphql.generated.ListDimensionNameResult;
import com.linkedin.datahub.graphql.generated.ListDomainsResult;
import com.linkedin.datahub.graphql.generated.ListGroupsResult;
import com.linkedin.datahub.graphql.generated.ListOwnershipTypesResult;
Expand Down Expand Up @@ -161,6 +163,10 @@
import com.linkedin.datahub.graphql.resolvers.dataset.DatasetUsageStatsResolver;
import com.linkedin.datahub.graphql.resolvers.dataset.IsAssignedToMeResolver;
import com.linkedin.datahub.graphql.resolvers.deprecation.UpdateDeprecationResolver;
import com.linkedin.datahub.graphql.resolvers.dimension.CreateDimensionNameResolver;
import com.linkedin.datahub.graphql.resolvers.dimension.DeleteDimensionNameResolver;
import com.linkedin.datahub.graphql.resolvers.dimension.ListDimensionNameResolver;
import com.linkedin.datahub.graphql.resolvers.dimension.UpdateDimensionNameResolver;
import com.linkedin.datahub.graphql.resolvers.domain.CreateDomainResolver;
import com.linkedin.datahub.graphql.resolvers.domain.DeleteDomainResolver;
import com.linkedin.datahub.graphql.resolvers.domain.DomainEntitiesResolver;
Expand Down Expand Up @@ -349,6 +355,7 @@
import com.linkedin.datahub.graphql.types.dataset.VersionedDatasetType;
import com.linkedin.datahub.graphql.types.dataset.mappers.DatasetProfileMapper;
import com.linkedin.datahub.graphql.types.datatype.DataTypeType;
import com.linkedin.datahub.graphql.types.dimension.DimensionNameType;
import com.linkedin.datahub.graphql.types.domain.DomainType;
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeType;
import com.linkedin.datahub.graphql.types.ermodelrelationship.CreateERModelRelationshipResolver;
Expand Down Expand Up @@ -395,6 +402,7 @@
import com.linkedin.metadata.service.AssertionService;
import com.linkedin.metadata.service.BusinessAttributeService;
import com.linkedin.metadata.service.DataProductService;
import com.linkedin.metadata.service.DimensionTypeService;
import com.linkedin.metadata.service.ERModelRelationshipService;
import com.linkedin.metadata.service.FormService;
import com.linkedin.metadata.service.LineageService;
Expand Down Expand Up @@ -469,6 +477,7 @@ public class GmsGraphQLEngine {
private final ERModelRelationshipService erModelRelationshipService;
private final FormService formService;
private final RestrictedService restrictedService;
private final DimensionTypeService dimensionTypeService;
private ConnectionService connectionService;
private AssertionService assertionService;

Expand Down Expand Up @@ -527,6 +536,7 @@ public class GmsGraphQLEngine {
private final FormType formType;
private final IncidentType incidentType;
private final RestrictedType restrictedType;
private final DimensionNameType dimensionNameType;

private final int graphQLQueryComplexityLimit;
private final int graphQLQueryDepthLimit;
Expand Down Expand Up @@ -593,6 +603,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.restrictedService = args.restrictedService;
this.connectionService = args.connectionService;
this.assertionService = args.assertionService;
this.dimensionTypeService = args.dimensionTypeService;

this.businessAttributeService = args.businessAttributeService;
this.ingestionConfiguration = Objects.requireNonNull(args.ingestionConfiguration);
Expand Down Expand Up @@ -646,6 +657,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.formType = new FormType(entityClient);
this.incidentType = new IncidentType(entityClient);
this.restrictedType = new RestrictedType(entityClient, restrictedService);
this.dimensionNameType = new DimensionNameType(entityClient);

this.graphQLQueryComplexityLimit = args.graphQLQueryComplexityLimit;
this.graphQLQueryDepthLimit = args.graphQLQueryDepthLimit;
Expand Down Expand Up @@ -696,7 +708,8 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
formType,
incidentType,
restrictedType,
businessAttributeType));
businessAttributeType,
dimensionNameType));
this.loadableTypes = new ArrayList<>(entityTypes);
// Extend loadable types with types from the plugins
// This allows us to offer search and browse capabilities out of the box for
Expand Down Expand Up @@ -796,6 +809,7 @@ public void configureRuntimeWiring(final RuntimeWiring.Builder builder) {
configureConnectionResolvers(builder);
configureDeprecationResolvers(builder);
configureMetadataAttributionResolver(builder);
configureDimensionNameResolver(builder);
}

private void configureOrganisationRoleResolvers(RuntimeWiring.Builder builder) {
Expand Down Expand Up @@ -850,7 +864,8 @@ public GraphQLEngine.Builder builder() {
.addSchema(fileBasedSchema(ASSERTIONS_SCHEMA_FILE))
.addSchema(fileBasedSchema(INCIDENTS_SCHEMA_FILE))
.addSchema(fileBasedSchema(CONTRACTS_SCHEMA_FILE))
.addSchema(fileBasedSchema(COMMON_SCHEMA_FILE));
.addSchema(fileBasedSchema(COMMON_SCHEMA_FILE))
.addSchema(fileBasedSchema(DATAQUALITY_SCHEMA_FILE));

for (GmsGraphQLPlugin plugin : this.graphQLPlugins) {
List<String> pluginSchemaFiles = plugin.getSchemaFiles();
Expand Down Expand Up @@ -1100,7 +1115,9 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
"listBusinessAttributes", new ListBusinessAttributesResolver(this.entityClient))
.dataFetcher(
"docPropagationSettings",
new DocPropagationSettingsResolver(this.settingsService)));
new DocPropagationSettingsResolver(this.settingsService))
.dataFetcher(
"listDimensionNames", new ListDimensionNameResolver(this.entityClient)));
}

private DataFetcher getEntitiesResolver() {
Expand Down Expand Up @@ -1361,8 +1378,14 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("updateForm", new UpdateFormResolver(this.entityClient))
.dataFetcher(
"updateDocPropagationSettings",
new UpdateDocPropagationSettingsResolver(this.settingsService));

new UpdateDocPropagationSettingsResolver(this.settingsService))
.dataFetcher(
"createDimensionName", new CreateDimensionNameResolver(this.dimensionTypeService))
.dataFetcher(
"updateDimensionName", new UpdateDimensionNameResolver(this.dimensionTypeService))
.dataFetcher(
"deleteDimensionName",
new DeleteDimensionNameResolver(this.dimensionTypeService));
if (featureFlags.isBusinessAttributeEntityEnabled()) {
typeWiring
.dataFetcher(
Expand Down Expand Up @@ -3003,6 +3026,21 @@ private void configureOwnershipTypeResolver(final RuntimeWiring.Builder builder)
.collect(Collectors.toList()))));
}

private void configureDimensionNameResolver(final RuntimeWiring.Builder builder) {
builder.type(
"ListDimensionNameResult",
typeWiring ->
typeWiring.dataFetcher(
"dimensionNames",
new LoadableTypeBatchResolver<>(
dimensionNameType,
(env) ->
((ListDimensionNameResult) env.getSource())
.getDimensionNames().stream()
.map(DimensionNameEntity::getUrn)
.collect(Collectors.toList()))));
}

private void configureDataProcessInstanceResolvers(final RuntimeWiring.Builder builder) {
builder.type(
"DataProcessInstance",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.linkedin.metadata.service.AssertionService;
import com.linkedin.metadata.service.BusinessAttributeService;
import com.linkedin.metadata.service.DataProductService;
import com.linkedin.metadata.service.DimensionTypeService;
import com.linkedin.metadata.service.ERModelRelationshipService;
import com.linkedin.metadata.service.FormService;
import com.linkedin.metadata.service.LineageService;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class GmsGraphQLEngineArgs {
BusinessAttributeService businessAttributeService;
ConnectionService connectionService;
AssertionService assertionService;
DimensionTypeService dimensionTypeService;

// any fork specific args should go below this line
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.linkedin.datahub.graphql.resolvers.dimension;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.CreateDimensionNameInput;
import com.linkedin.datahub.graphql.generated.DimensionNameEntity;
import com.linkedin.datahub.graphql.generated.DimensionNameInfo;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.metadata.service.DimensionTypeService;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
public class CreateDimensionNameResolver
implements DataFetcher<CompletableFuture<DimensionNameEntity>> {

private final DimensionTypeService _dimensionTypeService;

@Override
public CompletableFuture<DimensionNameEntity> get(DataFetchingEnvironment environment)
throws Exception {
final QueryContext context = environment.getContext();
final CreateDimensionNameInput input =
bindArgument(environment.getArgument("input"), CreateDimensionNameInput.class);

return CompletableFuture.supplyAsync(
() -> {
try {
final Urn urn =
_dimensionTypeService.createDimensionName(
context.getOperationContext(),
input.getName(),
input.getDescription(),
System.currentTimeMillis());
return createDimensionName(urn, input);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to perform update against input %s", input), e);
}
});
}

private DimensionNameEntity createDimensionName(
@Nonnull final Urn urn, @Nonnull final CreateDimensionNameInput input) {
return DimensionNameEntity.builder()
.setUrn(urn.toString())
.setType(EntityType.DIMENSION_NAME)
.setInfo(new DimensionNameInfo(input.getName(), input.getDescription(), null, null))
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.linkedin.datahub.graphql.resolvers.dimension;

import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.metadata.service.DimensionTypeService;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.concurrent.CompletableFuture;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
public class DeleteDimensionNameResolver implements DataFetcher<CompletableFuture<Boolean>> {

private final DimensionTypeService _dimensionTypeService;

@Override
public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throws Exception {
final QueryContext context = environment.getContext();
final String dimensionNameUrn = environment.getArgument("urn");
final Urn urn = UrnUtils.getUrn(dimensionNameUrn);
// By default, delete references
final boolean deleteReferences =
environment.getArgument("deleteReferences") == null
? true
: environment.getArgument("deleteReferences");

return CompletableFuture.supplyAsync(
() -> {
try {
_dimensionTypeService.deleteDimensionType(
context.getOperationContext(), urn, deleteReferences);
log.info(String.format("Successfully deleted dimension name %s with urn", urn));
return true;
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to delete dimension name with urn %s", dimensionNameUrn), e);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.linkedin.datahub.graphql.resolvers.dimension;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.DimensionNameEntity;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.ListDimensionNameInput;
import com.linkedin.datahub.graphql.generated.ListDimensionNameResult;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.query.filter.SortOrder;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchResult;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
public class ListDimensionNameResolver
implements DataFetcher<CompletableFuture<ListDimensionNameResult>> {

private static final String CREATED_AT_FIELD = "createdAt";
private static final SortCriterion DEFAULT_SORT_CRITERION =
new SortCriterion().setField(CREATED_AT_FIELD).setOrder(SortOrder.DESCENDING);

private static final Integer DEFAULT_START = 0;
private static final Integer DEFAULT_COUNT = 20;
private static final String DEFAULT_QUERY = "";

private final EntityClient _entityClient;

@Override
public CompletableFuture<ListDimensionNameResult> get(DataFetchingEnvironment environment)
throws Exception {
final QueryContext context = environment.getContext();
final ListDimensionNameInput input =
bindArgument(environment.getArgument("input"), ListDimensionNameInput.class);

return CompletableFuture.supplyAsync(
() -> {
final Integer start = input.getStart() == null ? DEFAULT_START : input.getStart();
final Integer count = input.getCount() == null ? DEFAULT_COUNT : input.getCount();
final String query = input.getQuery() == null ? DEFAULT_QUERY : input.getQuery();
final List<FacetFilterInput> filters =
input.getFilters() == null ? Collections.emptyList() : input.getFilters();

try {
List<SortCriterion> sortCriterionFiler = new ArrayList<>();
sortCriterionFiler.add(DEFAULT_SORT_CRITERION);
final SearchResult gmsResult =
_entityClient.search(
context.getOperationContext().withSearchFlags(flags -> flags.setFulltext(true)),
Constants.DIMENSION_TYPE_ENTITY_NAME,
query,
buildFilter(filters, Collections.emptyList()),
sortCriterionFiler,
start,
count);

final ListDimensionNameResult result = new ListDimensionNameResult();
result.setStart(gmsResult.getFrom());
result.setCount(gmsResult.getPageSize());
result.setTotal(gmsResult.getNumEntities());
result.setDimensionNames(
mapUnresolvedDimensionNames(
gmsResult.getEntities().stream()
.map(SearchEntity::getEntity)
.collect(Collectors.toList())));
return result;
} catch (Exception e) {
throw new RuntimeException("Failed to list dimension names", e);
}
});
}

private List<DimensionNameEntity> mapUnresolvedDimensionNames(List<Urn> entityUrns) {
final List<DimensionNameEntity> results = new ArrayList<>();
for (final Urn urn : entityUrns) {
final DimensionNameEntity unresolvedView = new DimensionNameEntity();
unresolvedView.setUrn(urn.toString());
unresolvedView.setType(EntityType.DIMENSION_NAME);
results.add(unresolvedView);
}
return results;
}
}
Loading