diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/ReadItem.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/ReadItem.java index 342b5376d8a755..106596bf80ccf0 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/ReadItem.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/ReadItem.java @@ -5,6 +5,7 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.mxe.GenericAspect; import com.linkedin.mxe.SystemMetadata; import java.lang.reflect.InvocationTargetException; import javax.annotation.Nonnull; @@ -26,6 +27,9 @@ public interface ReadItem { */ @Nonnull default String getAspectName() { + if (getAspectSpec() == null) { + return GenericAspect.dataSchema().getName(); + } return getAspectSpec().getName(); } @@ -72,6 +76,6 @@ static T getAspect(Class clazz, @Nullable RecordTemplate recordTemplate) * * @return aspect's specification */ - @Nonnull + @Nullable AspectSpec getAspectSpec(); } diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java index a302632e1936fd..6ae4c10545d66d 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java @@ -84,6 +84,12 @@ static void applyWriteMutationHooks( } } + default Stream applyProposalMutationHooks( + Collection proposedItems, @Nonnull RetrieverContext retrieverContext) { + return retrieverContext.getAspectRetriever().getEntityRegistry().getAllMutationHooks().stream() + .flatMap(mutationHook -> mutationHook.applyProposalMutation(proposedItems, retrieverContext)); + } + default ValidationExceptionCollection validateProposed( Collection mcpItems) { return validateProposed(mcpItems, getRetrieverContext()); diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/MutationHook.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/MutationHook.java index c067954912a032..f2d3893fc6c153 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/MutationHook.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/MutationHook.java @@ -3,6 +3,7 @@ import com.linkedin.metadata.aspect.ReadItem; import com.linkedin.metadata.aspect.RetrieverContext; import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.batch.MCPItem; import com.linkedin.metadata.aspect.plugins.PluginSpec; import com.linkedin.util.Pair; import java.util.Collection; @@ -39,6 +40,21 @@ public final Stream> applyReadMutation( retrieverContext); } + /** + * Apply Proposal mutations prior to validation + * @param mcpItems wrapper for MCP + * @param retrieverContext retriever context + * @return stream of mutated items + */ + public final Stream applyProposalMutation( + @Nonnull Collection mcpItems, @Nonnull RetrieverContext retrieverContext) { + return proposalMutation( + mcpItems.stream() + .filter(i -> shouldApply(i.getChangeType(), i.getEntitySpec(), i.getAspectSpec())) + .collect(Collectors.toList()), + retrieverContext); + } + protected Stream> readMutation( @Nonnull Collection items, @Nonnull RetrieverContext retrieverContext) { return items.stream().map(i -> Pair.of(i, false)); @@ -48,4 +64,9 @@ protected Stream> writeMutation( @Nonnull Collection changeMCPS, @Nonnull RetrieverContext retrieverContext) { return changeMCPS.stream().map(i -> Pair.of(i, false)); } + + protected Stream proposalMutation( + @Nonnull Collection mcpItems, @Nonnull RetrieverContext retrieverContext) { + return Stream.empty(); + } } diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java index 0914df744e413a..5f9a6aafdda3ce 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java @@ -8,6 +8,7 @@ import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.batch.MCPItem; import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.util.Pair; @@ -18,6 +19,7 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nonnull; import lombok.Builder; import lombok.Getter; @@ -44,9 +46,18 @@ public class AspectsBatchImpl implements AspectsBatch { public Pair>, List> toUpsertBatchItems( final Map> latestAspects) { + // Process proposals to change items + Stream mutatedProposalsStream = proposedItemsToChangeItemStream(items.stream() + .filter(item -> item instanceof ProposedItem) + .map(item -> (MCPItem) item) + .collect(Collectors.toList())); + // Regular change items + Stream changeMCPStream = items.stream() + .filter(item -> !(item instanceof ProposedItem)) + .map(item -> (ChangeMCP) item); + // Convert patches to upserts if needed - LinkedList upsertBatchItems = - items.stream() + LinkedList upsertBatchItems = Stream.concat(mutatedProposalsStream, changeMCPStream) .map( item -> { final String urnStr = item.getUrn().toString(); @@ -85,6 +96,10 @@ public Pair>, List> toUpsertBatchItems( return Pair.of(newUrnAspectNames, upsertBatchItems); } + private Stream proposedItemsToChangeItemStream(List proposedItems) { + return applyProposalMutationHooks(proposedItems, retrieverContext); + } + public static class AspectsBatchImplBuilder { /** * Just one aspect record template diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java new file mode 100644 index 00000000000000..a84143f9a445bc --- /dev/null +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java @@ -0,0 +1,37 @@ +package com.linkedin.metadata.entity.ebean.batch; + +import com.linkedin.common.AuditStamp; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.batch.MCPItem; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.mxe.SystemMetadata; +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +/** + * Represents an unvalidated wrapped MCP + */ +@Slf4j +@Getter +@Builder(toBuilder = true) +public class ProposedItem implements MCPItem { + @Nonnull private final ChangeType changeType; + // urn an urn associated with the new aspect + @Nonnull private final Urn urn; + @Nullable + private final MetadataChangeProposal metadataChangeProposal; + @Nonnull private SystemMetadata systemMetadata; + @Nonnull + private final AuditStamp auditStamp; + @Nonnull private final RecordTemplate recordTemplate; + // derived + @Nonnull private EntitySpec entitySpec; + @Nullable private AspectSpec aspectSpec; +} diff --git a/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/RequestContext.java b/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/RequestContext.java index dcea185fcbc7ca..fd63e18ecbb877 100644 --- a/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/RequestContext.java +++ b/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/RequestContext.java @@ -35,6 +35,8 @@ public class RequestContext implements ContextInterface { @Nonnull private final String requestID; @Nonnull private final String userAgent; + @Builder.Default + private final boolean validated = true; public RequestContext( @Nonnull String actorUrn,