Skip to content

Commit

Permalink
feat(mcp-mutator): new mcp mutator plugin
Browse files Browse the repository at this point in the history
* wip
  • Loading branch information
david-leifker committed Jul 12, 2024
1 parent d77d565 commit 391bdc0
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.GenericAspect;
import com.linkedin.mxe.SystemMetadata;
import java.lang.reflect.InvocationTargetException;
import javax.annotation.Nonnull;
Expand All @@ -26,6 +27,9 @@ public interface ReadItem {
*/
@Nonnull
default String getAspectName() {
if (getAspectSpec() == null) {
return GenericAspect.dataSchema().getName();
}
return getAspectSpec().getName();
}

Expand Down Expand Up @@ -72,6 +76,6 @@ static <T> T getAspect(Class<T> clazz, @Nullable RecordTemplate recordTemplate)
*
* @return aspect's specification
*/
@Nonnull
@Nullable
AspectSpec getAspectSpec();
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ static void applyWriteMutationHooks(
}
}

default Stream<ChangeMCP> applyProposalMutationHooks(
Collection<MCPItem> proposedItems, @Nonnull RetrieverContext retrieverContext) {
return retrieverContext.getAspectRetriever().getEntityRegistry().getAllMutationHooks().stream()
.flatMap(mutationHook -> mutationHook.applyProposalMutation(proposedItems, retrieverContext));
}

default <T extends BatchItem> ValidationExceptionCollection validateProposed(
Collection<T> mcpItems) {
return validateProposed(mcpItems, getRetrieverContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.metadata.aspect.ReadItem;
import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.plugins.PluginSpec;
import com.linkedin.util.Pair;
import java.util.Collection;
Expand Down Expand Up @@ -39,6 +40,21 @@ public final Stream<Pair<ReadItem, Boolean>> applyReadMutation(
retrieverContext);
}

/**
* Apply Proposal mutations prior to validation
* @param mcpItems wrapper for MCP
* @param retrieverContext retriever context
* @return stream of mutated items
*/
public final Stream<ChangeMCP> applyProposalMutation(
@Nonnull Collection<MCPItem> mcpItems, @Nonnull RetrieverContext retrieverContext) {
return proposalMutation(
mcpItems.stream()
.filter(i -> shouldApply(i.getChangeType(), i.getEntitySpec(), i.getAspectSpec()))
.collect(Collectors.toList()),
retrieverContext);
}

protected Stream<Pair<ReadItem, Boolean>> readMutation(
@Nonnull Collection<ReadItem> items, @Nonnull RetrieverContext retrieverContext) {
return items.stream().map(i -> Pair.of(i, false));
Expand All @@ -48,4 +64,9 @@ protected Stream<Pair<ChangeMCP, Boolean>> writeMutation(
@Nonnull Collection<ChangeMCP> changeMCPS, @Nonnull RetrieverContext retrieverContext) {
return changeMCPS.stream().map(i -> Pair.of(i, false));
}

protected Stream<ChangeMCP> proposalMutation(
@Nonnull Collection<MCPItem> mcpItems, @Nonnull RetrieverContext retrieverContext) {
return Stream.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.util.Pair;
Expand All @@ -18,6 +19,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Builder;
import lombok.Getter;
Expand All @@ -44,9 +46,18 @@ public class AspectsBatchImpl implements AspectsBatch {
public Pair<Map<String, Set<String>>, List<ChangeMCP>> toUpsertBatchItems(
final Map<String, Map<String, SystemAspect>> latestAspects) {

// Process proposals to change items
Stream<ChangeMCP> mutatedProposalsStream = proposedItemsToChangeItemStream(items.stream()
.filter(item -> item instanceof ProposedItem)
.map(item -> (MCPItem) item)
.collect(Collectors.toList()));
// Regular change items
Stream<MCPItem> changeMCPStream = items.stream()
.filter(item -> !(item instanceof ProposedItem))
.map(item -> (ChangeMCP) item);

// Convert patches to upserts if needed
LinkedList<ChangeMCP> upsertBatchItems =
items.stream()
LinkedList<ChangeMCP> upsertBatchItems = Stream.concat(mutatedProposalsStream, changeMCPStream)
.map(
item -> {
final String urnStr = item.getUrn().toString();
Expand Down Expand Up @@ -85,6 +96,10 @@ public Pair<Map<String, Set<String>>, List<ChangeMCP>> toUpsertBatchItems(
return Pair.of(newUrnAspectNames, upsertBatchItems);
}

private Stream<ChangeMCP> proposedItemsToChangeItemStream(List<MCPItem> proposedItems) {
return applyProposalMutationHooks(proposedItems, retrieverContext);
}

public static class AspectsBatchImplBuilder {
/**
* Just one aspect record template
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.linkedin.metadata.entity.ebean.batch;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/**
* Represents an unvalidated wrapped MCP
*/
@Slf4j
@Getter
@Builder(toBuilder = true)
public class ProposedItem implements MCPItem {
@Nonnull private final ChangeType changeType;
// urn an urn associated with the new aspect
@Nonnull private final Urn urn;
@Nullable
private final MetadataChangeProposal metadataChangeProposal;
@Nonnull private SystemMetadata systemMetadata;
@Nonnull
private final AuditStamp auditStamp;
@Nonnull private final RecordTemplate recordTemplate;
// derived
@Nonnull private EntitySpec entitySpec;
@Nullable private AspectSpec aspectSpec;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class RequestContext implements ContextInterface {
@Nonnull private final String requestID;

@Nonnull private final String userAgent;
@Builder.Default
private final boolean validated = true;

public RequestContext(
@Nonnull String actorUrn,
Expand Down

0 comments on commit 391bdc0

Please sign in to comment.