Skip to content

Commit

Permalink
feat(mcp-mutator): new mcp mutator plugin (datahub-project#10904)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored and aviv-julienjehannet committed Jul 25, 2024
1 parent 4819aa1 commit eaf2bed
Show file tree
Hide file tree
Showing 25 changed files with 786 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.GenericAspect;
import com.linkedin.mxe.SystemMetadata;
import java.lang.reflect.InvocationTargetException;
import javax.annotation.Nonnull;
Expand All @@ -26,6 +27,9 @@ public interface ReadItem {
*/
@Nonnull
default String getAspectName() {
if (getAspectSpec() == null) {
return GenericAspect.dataSchema().getName();
}
return getAspectSpec().getName();
}

Expand Down Expand Up @@ -72,6 +76,6 @@ static <T> T getAspect(Class<T> clazz, @Nullable RecordTemplate recordTemplate)
*
* @return aspect's specification
*/
@Nonnull
@Nullable
AspectSpec getAspectSpec();
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ static void applyWriteMutationHooks(
}
}

default Stream<MCPItem> applyProposalMutationHooks(
Collection<MCPItem> proposedItems, @Nonnull RetrieverContext retrieverContext) {
return retrieverContext.getAspectRetriever().getEntityRegistry().getAllMutationHooks().stream()
.flatMap(
mutationHook -> mutationHook.applyProposalMutation(proposedItems, retrieverContext));
}

default <T extends BatchItem> ValidationExceptionCollection validateProposed(
Collection<T> mcpItems) {
return validateProposed(mcpItems, getRetrieverContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -25,20 +24,13 @@ public boolean enabled() {
}

public boolean shouldApply(
@Nullable ChangeType changeType, @Nonnull Urn entityUrn, @Nonnull AspectSpec aspectSpec) {
return shouldApply(changeType, entityUrn.getEntityType(), aspectSpec);
@Nullable ChangeType changeType, @Nonnull Urn entityUrn, @Nonnull String aspectName) {
return shouldApply(changeType, entityUrn.getEntityType(), aspectName);
}

public boolean shouldApply(
@Nullable ChangeType changeType,
@Nonnull EntitySpec entitySpec,
@Nonnull AspectSpec aspectSpec) {
return shouldApply(changeType, entitySpec.getName(), aspectSpec.getName());
}

public boolean shouldApply(
@Nullable ChangeType changeType, @Nonnull String entityName, @Nonnull AspectSpec aspectSpec) {
return shouldApply(changeType, entityName, aspectSpec.getName());
@Nullable ChangeType changeType, @Nonnull EntitySpec entitySpec, @Nonnull String aspectName) {
return shouldApply(changeType, entitySpec.getName(), aspectName);
}

public boolean shouldApply(
Expand All @@ -49,8 +41,8 @@ && isChangeTypeSupported(changeType)
}

protected boolean isEntityAspectSupported(
@Nonnull EntitySpec entitySpec, @Nonnull AspectSpec aspectSpec) {
return isEntityAspectSupported(entitySpec.getName(), aspectSpec.getName());
@Nonnull EntitySpec entitySpec, @Nonnull String aspectName) {
return isEntityAspectSupported(entitySpec.getName(), aspectName);
}

protected boolean isEntityAspectSupported(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public final Stream<MCLItem> apply(
@Nonnull Collection<MCLItem> batchItems, @Nonnull RetrieverContext retrieverContext) {
return applyMCLSideEffect(
batchItems.stream()
.filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectSpec()))
.filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectName()))
.collect(Collectors.toList()),
retrieverContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public final Stream<ChangeMCP> apply(
Collection<ChangeMCP> changeMCPS, @Nonnull RetrieverContext retrieverContext) {
return applyMCPSideEffect(
changeMCPS.stream()
.filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectSpec()))
.filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectName()))
.collect(Collectors.toList()),
retrieverContext);
}
Expand All @@ -41,7 +41,7 @@ public final Stream<MCPItem> postApply(
Collection<MCLItem> mclItems, @Nonnull RetrieverContext retrieverContext) {
return postMCPSideEffect(
mclItems.stream()
.filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectSpec()))
.filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectName()))
.collect(Collectors.toList()),
retrieverContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.metadata.aspect.ReadItem;
import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.plugins.PluginSpec;
import com.linkedin.util.Pair;
import java.util.Collection;
Expand All @@ -24,7 +25,7 @@ public final Stream<Pair<ChangeMCP, Boolean>> applyWriteMutation(
@Nonnull Collection<ChangeMCP> changeMCPS, @Nonnull RetrieverContext retrieverContext) {
return writeMutation(
changeMCPS.stream()
.filter(i -> shouldApply(i.getChangeType(), i.getEntitySpec(), i.getAspectSpec()))
.filter(i -> shouldApply(i.getChangeType(), i.getEntitySpec(), i.getAspectName()))
.collect(Collectors.toList()),
retrieverContext);
}
Expand All @@ -34,7 +35,23 @@ public final Stream<Pair<ReadItem, Boolean>> applyReadMutation(
@Nonnull Collection<ReadItem> items, @Nonnull RetrieverContext retrieverContext) {
return readMutation(
items.stream()
.filter(i -> isEntityAspectSupported(i.getEntitySpec(), i.getAspectSpec()))
.filter(i -> isEntityAspectSupported(i.getEntitySpec(), i.getAspectName()))
.collect(Collectors.toList()),
retrieverContext);
}

/**
* Apply Proposal mutations prior to validation
*
* @param mcpItems wrapper for MCP
* @param retrieverContext retriever context
* @return stream of mutated Proposal items
*/
public final Stream<MCPItem> applyProposalMutation(
@Nonnull Collection<MCPItem> mcpItems, @Nonnull RetrieverContext retrieverContext) {
return proposalMutation(
mcpItems.stream()
.filter(i -> shouldApply(i.getChangeType(), i.getEntitySpec(), i.getAspectName()))
.collect(Collectors.toList()),
retrieverContext);
}
Expand All @@ -48,4 +65,9 @@ protected Stream<Pair<ChangeMCP, Boolean>> writeMutation(
@Nonnull Collection<ChangeMCP> changeMCPS, @Nonnull RetrieverContext retrieverContext) {
return changeMCPS.stream().map(i -> Pair.of(i, false));
}

protected Stream<MCPItem> proposalMutation(
@Nonnull Collection<MCPItem> mcpItems, @Nonnull RetrieverContext retrieverContext) {
return Stream.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public final Stream<AspectValidationException> validateProposed(
@Nonnull RetrieverContext retrieverContext) {
return validateProposedAspects(
mcpItems.stream()
.filter(i -> shouldApply(i.getChangeType(), i.getUrn(), i.getAspectSpec()))
.filter(i -> shouldApply(i.getChangeType(), i.getUrn(), i.getAspectName()))
.collect(Collectors.toList()),
retrieverContext);
}
Expand All @@ -37,7 +37,7 @@ public final Stream<AspectValidationException> validatePreCommit(
@Nonnull Collection<ChangeMCP> changeMCPs, @Nonnull RetrieverContext retrieverContext) {
return validatePreCommitAspects(
changeMCPs.stream()
.filter(i -> shouldApply(i.getChangeType(), i.getUrn(), i.getAspectSpec()))
.filter(i -> shouldApply(i.getChangeType(), i.getUrn(), i.getAspectName()))
.collect(Collectors.toList()),
retrieverContext);
}
Expand Down
1 change: 1 addition & 0 deletions metadata-io/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
api project(':metadata-service:services')
api project(':metadata-operation-context')

implementation spec.product.pegasus.restliServer
implementation spec.product.pegasus.data
implementation spec.product.pegasus.generator

Expand Down
7 changes: 7 additions & 0 deletions metadata-io/metadata-io-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,11 @@ dependencies {
implementation project(':metadata-utils')
compileOnly externalDependency.lombok
annotationProcessor externalDependency.lombok

testImplementation(externalDependency.testng)
testImplementation(externalDependency.mockito)
testImplementation(testFixtures(project(":entity-registry")))
testImplementation project(':metadata-operation-context')
testImplementation externalDependency.lombok
testAnnotationProcessor externalDependency.lombok
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.util.Pair;
Expand All @@ -18,6 +19,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Builder;
import lombok.Getter;
Expand All @@ -44,9 +46,20 @@ public class AspectsBatchImpl implements AspectsBatch {
public Pair<Map<String, Set<String>>, List<ChangeMCP>> toUpsertBatchItems(
final Map<String, Map<String, SystemAspect>> latestAspects) {

// Process proposals to change items
Stream<ChangeMCP> mutatedProposalsStream =
proposedItemsToChangeItemStream(
items.stream()
.filter(item -> item instanceof ProposedItem)
.map(item -> (MCPItem) item)
.collect(Collectors.toList()));
// Regular change items
Stream<? extends BatchItem> changeMCPStream =
items.stream().filter(item -> !(item instanceof ProposedItem));

// Convert patches to upserts if needed
LinkedList<ChangeMCP> upsertBatchItems =
items.stream()
Stream.concat(mutatedProposalsStream, changeMCPStream)
.map(
item -> {
final String urnStr = item.getUrn().toString();
Expand Down Expand Up @@ -85,6 +98,17 @@ public Pair<Map<String, Set<String>>, List<ChangeMCP>> toUpsertBatchItems(
return Pair.of(newUrnAspectNames, upsertBatchItems);
}

private Stream<ChangeMCP> proposedItemsToChangeItemStream(List<MCPItem> proposedItems) {
return applyProposalMutationHooks(proposedItems, retrieverContext)
.filter(mcpItem -> mcpItem.getMetadataChangeProposal() != null)
.map(
mcpItem ->
ChangeItemImpl.ChangeItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
retrieverContext.getAspectRetriever()));
}

public static class AspectsBatchImplBuilder {
/**
* Just one aspect record template
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.linkedin.metadata.entity.ebean.batch;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/** Represents an unvalidated wrapped MCP */
@Slf4j
@Getter
@Builder(toBuilder = true)
public class ProposedItem implements MCPItem {
@Nonnull private final MetadataChangeProposal metadataChangeProposal;
@Nonnull private final AuditStamp auditStamp;
// derived
@Nonnull private EntitySpec entitySpec;
@Nullable private AspectSpec aspectSpec;

@Nonnull
@Override
public String getAspectName() {
if (metadataChangeProposal.getAspectName() != null) {
return metadataChangeProposal.getAspectName();
} else {
return MCPItem.super.getAspectName();
}
}

@Nullable
public AspectSpec getAspectSpec() {
if (aspectSpec != null) {
return aspectSpec;
}
if (entitySpec.getAspectSpecMap().containsKey(getAspectName())) {
return entitySpec.getAspectSpecMap().get(getAspectName());
}
return null;
}

@Nullable
@Override
public RecordTemplate getRecordTemplate() {
if (getAspectSpec() != null) {
return GenericRecordUtils.deserializeAspect(
getMetadataChangeProposal().getAspect().getValue(),
getMetadataChangeProposal().getAspect().getContentType(),
getAspectSpec());
}
return null;
}

@Nonnull
@Override
public Urn getUrn() {
return metadataChangeProposal.getEntityUrn();
}

@Nullable
@Override
public SystemMetadata getSystemMetadata() {
return metadataChangeProposal.getSystemMetadata();
}

@Nonnull
@Override
public ChangeType getChangeType() {
return metadataChangeProposal.getChangeType();
}
}
Loading

0 comments on commit eaf2bed

Please sign in to comment.