diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java index cac9b5f9483d4..9c0c82dba65be 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java @@ -1,5 +1,6 @@ package com.linkedin.datahub.upgrade.config; +import com.datahub.authentication.Authentication; import com.linkedin.datahub.upgrade.system.BlockingSystemUpgrade; import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; import com.linkedin.datahub.upgrade.system.SystemUpdate; @@ -11,12 +12,25 @@ import com.linkedin.gms.factory.kafka.common.TopicConventionFactory; import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory; import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig; +import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory; +import com.linkedin.metadata.aspect.GraphRetriever; import com.linkedin.metadata.config.kafka.KafkaConfiguration; import com.linkedin.metadata.dao.producer.KafkaEventProducer; import com.linkedin.metadata.dao.producer.KafkaHealthChecker; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.EntityServiceAspectRetriever; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.search.SearchService; +import com.linkedin.metadata.search.SearchServiceSearchRetriever; import com.linkedin.metadata.version.GitVersion; import com.linkedin.mxe.TopicConvention; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.metadata.context.OperationContextConfig; +import io.datahubproject.metadata.context.RetrieverContext; +import io.datahubproject.metadata.context.ServicesRegistryContext; +import io.datahubproject.metadata.services.RestrictedService; import java.util.List; +import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.IndexedRecord; import org.apache.kafka.clients.producer.KafkaProducer; @@ -120,4 +134,46 @@ protected SchemaRegistryConfig schemaRegistryConfig( @Qualifier("duheSchemaRegistryConfig") SchemaRegistryConfig duheSchemaRegistryConfig) { return duheSchemaRegistryConfig; } + + @Primary + @Nonnull + @Bean(name = "systemOperationContext") + protected OperationContext javaSystemOperationContext( + @Nonnull @Qualifier("systemAuthentication") final Authentication systemAuthentication, + @Nonnull final OperationContextConfig operationContextConfig, + @Nonnull final EntityRegistry entityRegistry, + @Nonnull final EntityService entityService, + @Nonnull final RestrictedService restrictedService, + @Nonnull final GraphRetriever graphRetriever, + @Nonnull final SearchService searchService, + @Qualifier("baseElasticSearchComponents") + BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components) { + + EntityServiceAspectRetriever entityServiceAspectRetriever = + EntityServiceAspectRetriever.builder() + .entityRegistry(entityRegistry) + .entityService(entityService) + .build(); + + SearchServiceSearchRetriever searchServiceSearchRetriever = + SearchServiceSearchRetriever.builder().searchService(searchService).build(); + + OperationContext systemOperationContext = + OperationContext.asSystem( + operationContextConfig, + systemAuthentication, + entityServiceAspectRetriever.getEntityRegistry(), + ServicesRegistryContext.builder().restrictedService(restrictedService).build(), + components.getIndexConvention(), + RetrieverContext.builder() + .aspectRetriever(entityServiceAspectRetriever) + .graphRetriever(graphRetriever) + .searchRetriever(searchServiceSearchRetriever) + .build()); + + entityServiceAspectRetriever.setSystemOperationContext(systemOperationContext); + searchServiceSearchRetriever.setSystemOperationContext(systemOperationContext); + + return systemOperationContext; + } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadata.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadata.java new file mode 100644 index 0000000000000..20bc65bf15dae --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadata.java @@ -0,0 +1,48 @@ +package com.linkedin.datahub.upgrade.system.schemafield; + +import com.google.common.collect.ImmutableList; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; +import com.linkedin.metadata.entity.AspectDao; +import com.linkedin.metadata.entity.EntityService; +import io.datahubproject.metadata.context.OperationContext; +import java.util.List; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; + +/** + * A {@link NonBlockingSystemUpgrade} upgrade job that generates schema fields from schema metadata. + */ +@Slf4j +public class GenerateSchemaFieldsFromSchemaMetadata implements NonBlockingSystemUpgrade { + + private final List _steps; + + public GenerateSchemaFieldsFromSchemaMetadata( + @Nonnull OperationContext opContext, + EntityService entityService, + AspectDao aspectDao, + boolean enabled, + Integer batchSize, + Integer batchDelayMs, + Integer limit) { + if (enabled) { + _steps = + ImmutableList.of( + new GenerateSchemaFieldsFromSchemaMetadataStep( + opContext, entityService, aspectDao, batchSize, batchDelayMs, limit)); + } else { + _steps = ImmutableList.of(); + } + } + + @Override + public String id() { + return this.getClass().getName(); + } + + @Override + public List steps() { + return _steps; + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadataStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadataStep.java new file mode 100644 index 0000000000000..8a7da2e685da8 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadataStep.java @@ -0,0 +1,182 @@ +package com.linkedin.datahub.upgrade.system.schemafield; + +import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; +import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME; +import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; + +import com.google.common.annotations.VisibleForTesting; +import com.linkedin.common.urn.Urn; +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.batch.AspectsBatch; +import com.linkedin.metadata.boot.BootstrapStep; +import com.linkedin.metadata.entity.AspectDao; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.EntityUtils; +import com.linkedin.metadata.entity.ebean.EbeanAspectV2; +import com.linkedin.metadata.entity.ebean.PartitionedStream; +import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; +import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl; +import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; +import io.datahubproject.metadata.context.OperationContext; +import java.util.List; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.Nullable; + +/** + * The `GenerateSchemaFieldsFromSchemaMetadataStep` class is an implementation of the `UpgradeStep` + * interface. This class is responsible for generating schema fields from schema metadata during an + * upgrade process. + * + *

The step performs the following actions: 1. Initializes with provided operation context, + * entity service, and aspect DAO. 2. Provides a unique identifier for the upgrade step. 3. + * Determines if the upgrade should be skipped based on the environment variable. 4. Executes the + * upgrade step which involves streaming aspects in batches, processing them, and updating schema + * fields. + * + *

This class utilizes various metadata and entity services to perform its operations, and + * includes configuration parameters such as batch size, delay between batches, and limits. + * + *

Environment Variables: - `SKIP_GENERATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA`: If set to `true`, + * the upgrade step is skipped. + * + *

Note: Schema Fields are generated with a status aspect to indicate presence of the field. No + * tags, documentation or other aspects are generated. We will write an upgrade to this job to + * generate the other aspects in the future (v2). + */ +@Slf4j +public class GenerateSchemaFieldsFromSchemaMetadataStep implements UpgradeStep { + + private final OperationContext opContext; + private final EntityService entityService; + private final AspectDao aspectDao; + + private final int batchSize; + private final int batchDelayMs; + private final int limit; + + public GenerateSchemaFieldsFromSchemaMetadataStep( + OperationContext opContext, + EntityService entityService, + AspectDao aspectDao, + Integer batchSize, + Integer batchDelayMs, + Integer limit) { + this.opContext = opContext; + this.entityService = entityService; + this.aspectDao = aspectDao; + this.batchSize = batchSize; + this.batchDelayMs = batchDelayMs; + this.limit = limit; + log.info("GenerateSchemaFieldsFromSchemaMetadataStep initialized"); + } + + @Override + public String id() { + return "schema-field-from-schema-metadata-v1"; + } + + @VisibleForTesting + @Nullable + public String getUrnLike() { + return "urn:li:" + DATASET_ENTITY_NAME + ":%"; + } + + /** + * Returns whether the upgrade should be skipped. Uses previous run history or the environment + * variable SKIP_GENERATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA to determine whether to skip. + */ + public boolean skip(UpgradeContext context) { + boolean envFlagRecommendsSkip = + Boolean.parseBoolean(System.getenv("SKIP_GENERATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA")); + if (envFlagRecommendsSkip) { + log.info( + "Environment variable SKIP_GENERATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA is set to true. Skipping."); + } + return envFlagRecommendsSkip; + } + + protected Urn getUpgradeIdUrn() { + return BootstrapStep.getUpgradeUrn(id()); + } + + @Override + public Function executable() { + log.info("Starting GenerateSchemaFieldsFromSchemaMetadataStep"); + return (context) -> { + + // re-using for configuring the sql scan + RestoreIndicesArgs args = + new RestoreIndicesArgs() + .aspectNames(List.of(SCHEMA_METADATA_ASPECT_NAME, STATUS_ASPECT_NAME)) + .batchSize(batchSize) + .limit(limit); + + if (getUrnLike() != null) { + args = args.urnLike(getUrnLike()); + } + + try (PartitionedStream stream = aspectDao.streamAspectBatches(args)) { + stream + .partition(args.batchSize) + .forEach( + batch -> { + log.info("Processing batch of size {}.", batchSize); + + AspectsBatch aspectsBatch = + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext().get()) + .items( + batch + .flatMap( + ebeanAspectV2 -> + EntityUtils.toSystemAspectFromEbeanAspects( + opContext.getRetrieverContext().get(), + Set.of(ebeanAspectV2)) + .stream()) + .map( + systemAspect -> + ChangeItemImpl.builder() + .changeType(ChangeType.UPSERT) + .urn(systemAspect.getUrn()) + .entitySpec(systemAspect.getEntitySpec()) + .aspectName(systemAspect.getAspectName()) + .aspectSpec(systemAspect.getAspectSpec()) + .recordTemplate(systemAspect.getRecordTemplate()) + .auditStamp(systemAspect.getAuditStamp()) + .systemMetadata(systemAspect.getSystemMetadata()) + .build( + opContext + .getRetrieverContext() + .get() + .getAspectRetriever())) + .collect(Collectors.toList())) + .build(); + + // re-ingest the aspects to trigger side effects + entityService.ingestAspects(opContext, aspectsBatch, true, false); + + if (batchDelayMs > 0) { + log.info("Sleeping for {} ms", batchDelayMs); + try { + Thread.sleep(batchDelayMs); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + } + + BootstrapStep.setUpgradeResult(opContext, getUpgradeIdUrn(), entityService); + context.report().addLine("State updated: " + getUpgradeIdUrn()); + + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED); + }; + } +} diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/schemafield/GenerateSchemaFieldsFromSchemaMetadataStepTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/schemafield/GenerateSchemaFieldsFromSchemaMetadataStepTest.java new file mode 100644 index 0000000000000..040ea64b2be7e --- /dev/null +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/schemafield/GenerateSchemaFieldsFromSchemaMetadataStepTest.java @@ -0,0 +1,109 @@ +package com.linkedin.datahub.upgrade.schemafield; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.datahub.upgrade.system.schemafield.GenerateSchemaFieldsFromSchemaMetadataStep; +import com.linkedin.metadata.aspect.SystemAspect; +import com.linkedin.metadata.entity.AspectDao; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ebean.EbeanAspectV2; +import com.linkedin.metadata.entity.ebean.PartitionedStream; +import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; +import com.linkedin.schema.SchemaMetadata; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.metadata.context.RetrieverContext; +import java.util.Optional; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class GenerateSchemaFieldsFromSchemaMetadataStepTest { + + @Mock private OperationContext mockOpContext; + + @Mock private EntityService mockEntityService; + + @Mock private AspectDao mockAspectDao; + + @Mock private RetrieverContext mockRetrieverContext; + + private GenerateSchemaFieldsFromSchemaMetadataStep step; + + @BeforeEach + public void setup() { + MockitoAnnotations.openMocks(this); + step = + new GenerateSchemaFieldsFromSchemaMetadataStep( + mockOpContext, mockEntityService, mockAspectDao, 10, 100, 1000); + when(mockOpContext.getRetrieverContext()).thenReturn(Optional.of(mockRetrieverContext)); + } + + /** Test to verify the correct step ID is returned. */ + @Test + public void testId() { + assertEquals("schema-field-from-schema-metadata-v1", step.id()); + } + + /** Test to verify the skip logic based on the environment variable. */ + @Test + public void testSkip() { + UpgradeContext mockContext = mock(UpgradeContext.class); + System.setProperty("SKIP_GENERATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA", "true"); + assertTrue(step.skip(mockContext)); + + System.setProperty("SKIP_GENERATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA", "false"); + assertFalse(step.skip(mockContext)); + } + + /** Test to verify the correct URN pattern is returned. */ + @Test + public void testGetUrnLike() { + assertEquals("urn:li:dataset:%", step.getUrnLike()); + } + + /** + * Test to verify the executable function processes batches correctly and returns a success + * result. + */ + @Test + public void testExecutable() { + UpgradeContext mockContext = mock(UpgradeContext.class); + + EbeanAspectV2 mockAspect = mock(EbeanAspectV2.class); + PartitionedStream mockStream = mock(PartitionedStream.class); + when(mockAspectDao.streamAspectBatches(any(RestoreIndicesArgs.class))).thenReturn(mockStream); + + when(mockStream.partition(anyInt())).thenReturn(Stream.of(Stream.of(mockAspect))); + + SystemAspect mockSystemAspect = mock(SystemAspect.class); + when(mockSystemAspect.getAspectName()).thenReturn("schemaMetadata"); + when(mockSystemAspect.getAspect(SchemaMetadata.class)).thenReturn(new SchemaMetadata()); + + // when(mockRetrieverContext.getAspectRetriever()).thenReturn(mockSystemAspect); + + ArgumentCaptor argsCaptor = + ArgumentCaptor.forClass(RestoreIndicesArgs.class); + + UpgradeStepResult result = step.executable().apply(mockContext); + assertEquals(UpgradeStepResult.Result.SUCCEEDED, result.result()); + + verify(mockAspectDao).streamAspectBatches(argsCaptor.capture()); + assertEquals("schemaMetadata", argsCaptor.getValue().aspectName()); + assertEquals(10, argsCaptor.getValue().batchSize()); + assertEquals(1000, argsCaptor.getValue().limit()); + } + + // Additional tests can be added to cover more scenarios and edge cases +} diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java index 77820948b00cb..dc7934ad5cc19 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java @@ -9,6 +9,7 @@ import com.linkedin.util.Pair; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -198,16 +199,13 @@ default Map> getNewUrnAspectsMap( static Map> merge( @Nonnull Map> a, @Nonnull Map> b) { - return Stream.concat(a.entrySet().stream(), b.entrySet().stream()) - .flatMap( - entry -> - entry.getValue().entrySet().stream() - .map(innerEntry -> Pair.of(entry.getKey(), innerEntry))) - .collect( - Collectors.groupingBy( - Pair::getKey, - Collectors.mapping( - Pair::getValue, Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))); + + Map> mergedMap = new HashMap<>(); + for (Map.Entry> entry : + Stream.concat(a.entrySet().stream(), b.entrySet().stream()).collect(Collectors.toList())) { + mergedMap.computeIfAbsent(entry.getKey(), k -> new HashMap<>()).putAll(entry.getValue()); + } + return mergedMap; } default String toAbbreviatedString(int maxWidth) { diff --git a/li-utils/src/main/java/com/linkedin/metadata/Constants.java b/li-utils/src/main/java/com/linkedin/metadata/Constants.java index 9a7b8287e2c6a..d0c1ef49643fe 100644 --- a/li-utils/src/main/java/com/linkedin/metadata/Constants.java +++ b/li-utils/src/main/java/com/linkedin/metadata/Constants.java @@ -82,6 +82,7 @@ public class Constants { public static final String DATAHUB_ROLE_ENTITY_NAME = "dataHubRole"; public static final String POST_ENTITY_NAME = "post"; public static final String SCHEMA_FIELD_ENTITY_NAME = "schemaField"; + public static final String SCHEMA_FIELD_KEY_ASPECT = "schemaFieldKey"; public static final String DATAHUB_STEP_STATE_ENTITY_NAME = "dataHubStepState"; public static final String DATAHUB_VIEW_ENTITY_NAME = "dataHubView"; public static final String QUERY_ENTITY_NAME = "query"; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java index 9725abdf7fdc2..524e947476122 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java @@ -512,6 +512,9 @@ public PartitionedStream streamAspectBatches(final RestoreIndices if (args.aspectName != null) { exp = exp.eq(EbeanAspectV2.ASPECT_COLUMN, args.aspectName); } + if (args.aspectNames != null && !args.aspectNames.isEmpty()) { + exp = exp.in(EbeanAspectV2.ASPECT_COLUMN, args.aspectNames); + } if (args.urn != null) { exp = exp.eq(EbeanAspectV2.URN_COLUMN, args.urn); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/schemafields/sideeffects/SchemaFieldSideEffect.java b/metadata-io/src/main/java/com/linkedin/metadata/schemafields/sideeffects/SchemaFieldSideEffect.java new file mode 100644 index 0000000000000..170cfc277ce94 --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/schemafields/sideeffects/SchemaFieldSideEffect.java @@ -0,0 +1,402 @@ +package com.linkedin.metadata.schemafields.sideeffects; + +import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; +import static com.linkedin.metadata.Constants.SCHEMA_FIELD_ENTITY_NAME; +import static com.linkedin.metadata.Constants.SCHEMA_FIELD_KEY_ASPECT; +import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME; +import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; + +import com.linkedin.common.Status; +import com.linkedin.common.urn.Urn; +import com.linkedin.entity.Aspect; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.aspect.AspectRetriever; +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.batch.BatchItem; +import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.batch.MCLItem; +import com.linkedin.metadata.aspect.batch.MCPItem; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.hooks.MCPSideEffect; +import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl; +import com.linkedin.metadata.entity.ebean.batch.DeleteItemImpl; +import com.linkedin.metadata.key.SchemaFieldKey; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.schema.SchemaField; +import com.linkedin.schema.SchemaMetadata; +import com.linkedin.util.Pair; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Getter +@Setter +@Accessors(chain = true) +public class SchemaFieldSideEffect extends MCPSideEffect { + @Nonnull private AspectPluginConfig config; + + private static final Set REQUIRED_ASPECTS = + Set.of(Constants.SCHEMA_METADATA_ASPECT_NAME, Constants.STATUS_ASPECT_NAME); + + @Override + protected Stream applyMCPSideEffect( + Collection changeMCPs, @Nonnull RetrieverContext retrieverContext) { + + // fetch existing aspects + Map> aspectData = + fetchRequiredAspects( + changeMCPs.stream().map(i -> (BatchItem) i).collect(Collectors.toList()), + REQUIRED_ASPECTS, + retrieverContext.getAspectRetriever()); + + Map> batchMCPAspectNames = + changeMCPs.stream() + .collect( + Collectors.groupingBy( + ChangeMCP::getUrn, + Collectors.mapping(ChangeMCP::getAspectName, Collectors.toSet()))); + + Stream schemaFieldSideEffects = + changeMCPs.stream() + .filter( + changeMCP -> + DATASET_ENTITY_NAME.equals(changeMCP.getUrn().getEntityType()) + && Constants.SCHEMA_METADATA_ASPECT_NAME.equals(changeMCP.getAspectName())) + .flatMap( + item -> + buildSchemaFieldKeyMCPs( + item, aspectData, retrieverContext.getAspectRetriever())); + + Stream statusSideEffects = + changeMCPs.stream() + .filter( + changeMCP -> + DATASET_ENTITY_NAME.equals(changeMCP.getUrn().getEntityType()) + && Constants.STATUS_ASPECT_NAME.equals(changeMCP.getAspectName()) + // if present, already computed above + && !batchMCPAspectNames + .getOrDefault(changeMCP.getUrn(), Set.of()) + .contains(Constants.SCHEMA_METADATA_ASPECT_NAME)) + .flatMap( + item -> + mirrorStatusAspect(item, aspectData, retrieverContext.getAspectRetriever())); + + return Stream.concat(statusSideEffects, schemaFieldSideEffects); + } + + /** + * Handle delete of dataset schema metadata + * + * @param mclItems MCL items generated from committing the MCP + * @param retrieverContext accessors for aspect and graph data + * @return + */ + @Override + protected Stream postMCPSideEffect( + Collection mclItems, @Nonnull RetrieverContext retrieverContext) { + + List schemaMetadataDeletes = + mclItems.stream() + .filter( + item -> + ChangeType.DELETE.equals(item.getChangeType()) + && DATASET_ENTITY_NAME.equals(item.getUrn().getEntityType()) + && SCHEMA_METADATA_ASPECT_NAME.equals(item.getAspectName())) + .collect(Collectors.toList()); + Stream schemaMetadataSchemaFieldMCPs = + schemaMetadataDeletes.stream() + .flatMap( + item -> + buildSchemaMetadataSchemaFieldDeleteMCPs( + item, retrieverContext.getAspectRetriever())); + + List statusDeletes = + mclItems.stream() + .filter( + item -> + ChangeType.DELETE.equals(item.getChangeType()) + && DATASET_ENTITY_NAME.equals(item.getUrn().getEntityType()) + && STATUS_ASPECT_NAME.equals(item.getAspectName())) + .collect(Collectors.toList()); + final Stream statusSchemaFieldMCPs; + if (statusDeletes.isEmpty()) { + statusSchemaFieldMCPs = Stream.empty(); + } else { + Map> aspectData = + fetchRequiredAspects( + statusDeletes.stream().map(item -> (BatchItem) item).collect(Collectors.toList()), + Set.of(SCHEMA_METADATA_ASPECT_NAME), + retrieverContext.getAspectRetriever()); + statusSchemaFieldMCPs = + statusDeletes.stream() + .flatMap( + item -> + buildStatusSchemaFieldDeleteMCPs( + item, + Optional.ofNullable( + aspectData + .getOrDefault(item.getUrn(), Map.of()) + .get(SCHEMA_METADATA_ASPECT_NAME)) + .map(aspect -> new SchemaMetadata(aspect.data())) + .orElse(null), + retrieverContext.getAspectRetriever())); + } + + return Stream.concat(schemaMetadataSchemaFieldMCPs, statusSchemaFieldMCPs); + } + + /** + * Copy aspect from dataset to schema fields + * + * @param parentDatasetStatusItem dataset mcp item + * @param aspectRetriever aspectRetriever context + * @return side effect schema field aspects + */ + private static Stream mirrorStatusAspect( + BatchItem parentDatasetStatusItem, + Map> aspectData, + @Nonnull AspectRetriever aspectRetriever) { + + SchemaMetadata schemaMetadata = + Optional.ofNullable( + aspectData + .getOrDefault(parentDatasetStatusItem.getUrn(), Map.of()) + .getOrDefault(Constants.SCHEMA_METADATA_ASPECT_NAME, null)) + .map(aspect -> new SchemaMetadata(aspect.data())) + .orElse(null); + + return buildSchemaFieldStatusMCPs(parentDatasetStatusItem, schemaMetadata, aspectRetriever); + } + + /** + * Build a schema field status MCP based on the input status from the dataset + * + * @param parentDatasetStatusItem dataset's status aspect + * @param parentDatasetSchemaMetadata dataset's schema metadata + * @param aspectRetriever aspect retriever + * @return stream of status aspects for schema fields + */ + public static Stream buildSchemaFieldStatusMCPs( + @Nonnull BatchItem parentDatasetStatusItem, + @Nullable SchemaMetadata parentDatasetSchemaMetadata, + @Nonnull AspectRetriever aspectRetriever) { + if (parentDatasetSchemaMetadata == null) { + return Stream.empty(); + } else { + return parentDatasetSchemaMetadata.getFields().stream() + .map( + schemaField -> + ChangeItemImpl.builder() + .urn(getSchemaFieldUrn(parentDatasetStatusItem.getUrn(), schemaField)) + .changeType(ChangeType.UPSERT) + .aspectName(parentDatasetStatusItem.getAspectName()) + .recordTemplate(parentDatasetStatusItem.getRecordTemplate()) + .auditStamp(parentDatasetStatusItem.getAuditStamp()) + .systemMetadata(parentDatasetStatusItem.getSystemMetadata()) + .build(aspectRetriever)); + } + } + + /** + * Expand dataset schemaMetadata to schemaFields + * + * @param parentDatasetMetadataSchemaItem dataset mcp item + * @param aspectRetriever aspectRetriever context + * @return side effect schema field aspects + */ + private static Stream buildSchemaFieldKeyMCPs( + BatchItem parentDatasetMetadataSchemaItem, + Map> aspectData, + @Nonnull AspectRetriever aspectRetriever) { + + Stream schemaFieldKeys = + buildSchemaFieldKeyMCPs(parentDatasetMetadataSchemaItem, aspectRetriever); + + // Handle case where dataset status created before schema metadata + final Stream statusSideEffects; + if (aspectData + .getOrDefault(parentDatasetMetadataSchemaItem.getUrn(), Map.of()) + .containsKey(STATUS_ASPECT_NAME)) { + Status status = + new Status( + aspectData + .get(parentDatasetMetadataSchemaItem.getUrn()) + .get(STATUS_ASPECT_NAME) + .data()); + + ChangeItemImpl datasetStatusItem = + ChangeItemImpl.builder() + .changeType(ChangeType.UPSERT) + .urn(parentDatasetMetadataSchemaItem.getUrn()) + .aspectName(STATUS_ASPECT_NAME) + .entitySpec(parentDatasetMetadataSchemaItem.getEntitySpec()) + .aspectSpec( + parentDatasetMetadataSchemaItem.getEntitySpec().getAspectSpec(STATUS_ASPECT_NAME)) + .recordTemplate(status) + .auditStamp(parentDatasetMetadataSchemaItem.getAuditStamp()) + .systemMetadata(parentDatasetMetadataSchemaItem.getSystemMetadata()) + .build(aspectRetriever); + statusSideEffects = mirrorStatusAspect(datasetStatusItem, aspectData, aspectRetriever); + } else { + statusSideEffects = Stream.empty(); + } + + return Stream.concat(schemaFieldKeys, statusSideEffects); + } + + /** + * Given a dataset's metadata schema item, generate schema field key aspects + * + * @param parentDatasetMetadataSchemaItem dataset's metadata schema MCP + * @param aspectRetriever retriever + * @return stream of schema field MCPs for its key aspect + */ + private static Stream buildSchemaFieldKeyMCPs( + @Nonnull BatchItem parentDatasetMetadataSchemaItem, + @Nonnull AspectRetriever aspectRetriever) { + + SchemaMetadata schemaMetadata = parentDatasetMetadataSchemaItem.getAspect(SchemaMetadata.class); + + return schemaMetadata.getFields().stream() + .map( + schemaField -> + ChangeItemImpl.builder() + .urn(getSchemaFieldUrn(parentDatasetMetadataSchemaItem.getUrn(), schemaField)) + .changeType(ChangeType.UPSERT) + .aspectName(SCHEMA_FIELD_KEY_ASPECT) + .recordTemplate( + new SchemaFieldKey() + .setFieldPath(schemaField.getFieldPath()) + .setParent(parentDatasetMetadataSchemaItem.getUrn())) + .auditStamp(parentDatasetMetadataSchemaItem.getAuditStamp()) + .systemMetadata(parentDatasetMetadataSchemaItem.getSystemMetadata()) + .build(aspectRetriever)); + } + + /** + * Given a delete MCL for a dataset's schema metadata, generate delete MCPs for the schemaField + * aspects + * + * @param parentDatasetSchemaMetadataDelete the dataset's MCL from a metadata schema delete + * @param aspectRetriever retriever context + * @return follow-up deletes for the schema field + */ + public static Stream buildSchemaMetadataSchemaFieldDeleteMCPs( + @Nonnull MCLItem parentDatasetSchemaMetadataDelete, + @Nonnull AspectRetriever aspectRetriever) { + + EntityRegistry entityRegistry = aspectRetriever.getEntityRegistry(); + Urn datasetUrn = parentDatasetSchemaMetadataDelete.getUrn(); + SchemaMetadata parentDatasetSchemaMetadata = + parentDatasetSchemaMetadataDelete.getPreviousAspect(SchemaMetadata.class); + + return parentDatasetSchemaMetadata.getFields().stream() + .flatMap( + schemaField -> + entityRegistry.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME).getAspectSpecs().stream() + .map( + aspectSpec -> + DeleteItemImpl.builder() + .urn(getSchemaFieldUrn(datasetUrn, schemaField)) + .aspectName(aspectSpec.getName()) + .auditStamp(parentDatasetSchemaMetadataDelete.getAuditStamp()) + .entitySpec(entityRegistry.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) + .aspectSpec(aspectSpec) + .build(aspectRetriever))); + } + + public static Stream buildStatusSchemaFieldDeleteMCPs( + @Nonnull MCLItem parentDatasetStatusDelete, + @Nullable SchemaMetadata parentDatasetSchemaMetadata, + @Nonnull AspectRetriever aspectRetriever) { + + if (parentDatasetSchemaMetadata == null) { + return Stream.empty(); + } else { + EntityRegistry entityRegistry = aspectRetriever.getEntityRegistry(); + return parentDatasetSchemaMetadata.getFields().stream() + .map( + schemaField -> + DeleteItemImpl.builder() + .urn(getSchemaFieldUrn(parentDatasetStatusDelete.getUrn(), schemaField)) + .aspectName(STATUS_ASPECT_NAME) + .auditStamp(parentDatasetStatusDelete.getAuditStamp()) + .entitySpec(entityRegistry.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) + .aspectSpec(parentDatasetStatusDelete.getAspectSpec()) + .build(aspectRetriever)); + } + } + + /** + * Fetch missing aspects if not already in the batch + * + * @param batchItems batch mcps + * @param aspectRetriever aspect retriever + * @return required aspects for the side effect processing + */ + private static Map> fetchRequiredAspects( + Collection batchItems, + Set requiredAspectNames, + AspectRetriever aspectRetriever) { + Map> aspectData = new HashMap<>(); + + // Aspects included data in batch + batchItems.stream() + .filter(item -> item.getRecordTemplate() != null) + .forEach( + item -> + aspectData + .computeIfAbsent(item.getUrn(), k -> new HashMap<>()) + .put(item.getAspectName(), new Aspect(item.getRecordTemplate().data()))); + + // Aspect to fetch + Map> missingAspectData = + batchItems.stream() + .flatMap( + item -> + requiredAspectNames.stream() + .filter( + aspectName -> + !aspectData + .getOrDefault(item.getUrn(), Map.of()) + .containsKey(aspectName)) + .map(aspectName -> Pair.of(item.getUrn(), aspectName))) + .collect( + Collectors.groupingBy( + Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toSet()))); + + // Fetch missing + missingAspectData.forEach( + (urn, aspectNames) -> { + Map fetchedData = + aspectRetriever.getLatestAspectObjects(Set.of(urn), aspectNames).get(urn); + if (fetchedData != null) { + fetchedData.forEach( + (aspectName, aspectValue) -> + aspectData + .computeIfAbsent(urn, k -> new HashMap<>()) + .put(aspectName, aspectValue)); + } + }); + + return aspectData; + } + + private static Urn getSchemaFieldUrn(Urn parentUrn, SchemaField field) { + return Urn.createFromTuple( + SCHEMA_FIELD_ENTITY_NAME, parentUrn.toString(), field.getFieldPath()); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/schemafields/sideeffects/SchemaFieldSideEffectTest.java b/metadata-io/src/test/java/com/linkedin/metadata/schemafields/sideeffects/SchemaFieldSideEffectTest.java new file mode 100644 index 0000000000000..4a0c3f97d5834 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/schemafields/sideeffects/SchemaFieldSideEffectTest.java @@ -0,0 +1,476 @@ +package com.linkedin.metadata.schemafields.sideeffects; + +import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; +import static com.linkedin.metadata.Constants.SCHEMA_FIELD_ENTITY_NAME; +import static com.linkedin.metadata.Constants.SCHEMA_FIELD_KEY_ASPECT; +import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME; +import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; + +import com.linkedin.common.Status; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.ByteString; +import com.linkedin.entity.Aspect; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.AspectRetriever; +import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.batch.MCLItem; +import com.linkedin.metadata.aspect.batch.MCPItem; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.entity.SearchRetriever; +import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl; +import com.linkedin.metadata.entity.ebean.batch.DeleteItemImpl; +import com.linkedin.metadata.entity.ebean.batch.MCLItemImpl; +import com.linkedin.metadata.key.SchemaFieldKey; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.utils.AuditStampUtils; +import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.mxe.MetadataChangeLog; +import com.linkedin.schema.SchemaMetadata; +import com.linkedin.test.metadata.aspect.TestEntityRegistry; +import io.datahubproject.metadata.context.RetrieverContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class SchemaFieldSideEffectTest { + private static final EntityRegistry TEST_REGISTRY = new TestEntityRegistry(); + private static final List SUPPORTED_CHANGE_TYPES = + List.of( + ChangeType.CREATE, + ChangeType.CREATE_ENTITY, + ChangeType.UPSERT, + ChangeType.DELETE, + ChangeType.RESTATE); + private static final Urn TEST_URN = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)"); + private static final AspectPluginConfig TEST_PLUGIN_CONFIG = + AspectPluginConfig.builder() + .className(SchemaFieldSideEffect.class.getName()) + .enabled(true) + .supportedOperations( + SUPPORTED_CHANGE_TYPES.stream() + .map(ChangeType::toString) + .collect(Collectors.toList())) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(DATASET_ENTITY_NAME) + .aspectName(SCHEMA_METADATA_ASPECT_NAME) + .build(), + AspectPluginConfig.EntityAspectName.builder() + .entityName(DATASET_ENTITY_NAME) + .aspectName(STATUS_ASPECT_NAME) + .build())) + .build(); + + private AspectRetriever mockAspectRetriever; + private RetrieverContext retrieverContext; + + @BeforeMethod + public void setup() { + mockAspectRetriever = mock(AspectRetriever.class); + when(mockAspectRetriever.getEntityRegistry()).thenReturn(TEST_REGISTRY); + retrieverContext = + RetrieverContext.builder() + .searchRetriever(mock(SearchRetriever.class)) + .aspectRetriever(mockAspectRetriever) + .graphRetriever(TestOperationContexts.emptyGraphRetriever) + .build(); + } + + @Test + public void schemaMetadataToSchemaFieldKeyTest() { + SchemaFieldSideEffect test = new SchemaFieldSideEffect(); + test.setConfig(TEST_PLUGIN_CONFIG); + SchemaMetadata schemaMetadata = getTestSchemaMetadata(); + + List testOutput; + for (ChangeType changeType : + List.of(ChangeType.CREATE, ChangeType.CREATE_ENTITY, ChangeType.UPSERT)) { + // Run test + ChangeItemImpl schemaMetadataChangeItem = + ChangeItemImpl.builder() + .urn(TEST_URN) + .aspectName(SCHEMA_METADATA_ASPECT_NAME) + .changeType(changeType) + .entitySpec(TEST_REGISTRY.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectSpec( + TEST_REGISTRY + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schemaMetadata) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(mockAspectRetriever); + testOutput = + test.applyMCPSideEffect(List.of(schemaMetadataChangeItem), retrieverContext).toList(); + + // Verify test + switch (changeType) { + default -> { + assertEquals( + testOutput.size(), 2, "Unexpected output items for changeType:" + changeType); + + assertEquals( + testOutput, + List.of( + ChangeItemImpl.builder() + .urn( + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)")) + .aspectName(SCHEMA_FIELD_KEY_ASPECT) + .changeType(changeType) + .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) + .aspectSpec( + TEST_REGISTRY + .getEntitySpec(SCHEMA_FIELD_ENTITY_NAME) + .getAspectSpec(SCHEMA_FIELD_KEY_ASPECT)) + .recordTemplate( + new SchemaFieldKey() + .setFieldPath("user_id") + .setParent(schemaMetadataChangeItem.getUrn())) + .auditStamp(schemaMetadataChangeItem.getAuditStamp()) + .systemMetadata(schemaMetadataChangeItem.getSystemMetadata()) + .build(mockAspectRetriever), + ChangeItemImpl.builder() + .urn( + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)")) + .aspectName(SCHEMA_FIELD_KEY_ASPECT) + .changeType(changeType) + .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) + .aspectSpec( + TEST_REGISTRY + .getEntitySpec(SCHEMA_FIELD_ENTITY_NAME) + .getAspectSpec(SCHEMA_FIELD_KEY_ASPECT)) + .recordTemplate( + new SchemaFieldKey() + .setFieldPath("user_name") + .setParent(schemaMetadataChangeItem.getUrn())) + .auditStamp(schemaMetadataChangeItem.getAuditStamp()) + .systemMetadata(schemaMetadataChangeItem.getSystemMetadata()) + .build(mockAspectRetriever))); + } + } + } + } + + @Test + public void statusToSchemaFieldStatusTest() { + SchemaFieldSideEffect test = new SchemaFieldSideEffect(); + test.setConfig(TEST_PLUGIN_CONFIG); + SchemaMetadata schemaMetadata = getTestSchemaMetadata(); + Status status = new Status().setRemoved(true); + + // Case 1. schemaMetadata (exists), then status updated + reset(mockAspectRetriever); + when(mockAspectRetriever.getEntityRegistry()).thenReturn(TEST_REGISTRY); + when(mockAspectRetriever.getLatestAspectObjects( + Set.of(TEST_URN), Set.of(SCHEMA_METADATA_ASPECT_NAME))) + .thenReturn( + Map.of( + TEST_URN, Map.of(SCHEMA_METADATA_ASPECT_NAME, new Aspect(schemaMetadata.data())))); + + List testOutput; + for (ChangeType changeType : List.of(ChangeType.CREATE, ChangeType.UPSERT)) { + // Run Status test + ChangeItemImpl statusChangeItem = + ChangeItemImpl.builder() + .urn(TEST_URN) + .aspectName(STATUS_ASPECT_NAME) + .changeType(changeType) + .entitySpec(TEST_REGISTRY.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectSpec( + TEST_REGISTRY + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(STATUS_ASPECT_NAME)) + .recordTemplate(status) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(mockAspectRetriever); + testOutput = test.applyMCPSideEffect(List.of(statusChangeItem), retrieverContext).toList(); + + // Verify test + switch (changeType) { + default -> { + assertEquals( + testOutput.size(), 2, "Unexpected output items for changeType:" + changeType); + + assertEquals( + testOutput, + List.of( + ChangeItemImpl.builder() + .urn( + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)")) + .aspectName(STATUS_ASPECT_NAME) + .changeType(changeType) + .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) + .aspectSpec( + TEST_REGISTRY + .getEntitySpec(SCHEMA_FIELD_ENTITY_NAME) + .getAspectSpec(STATUS_ASPECT_NAME)) + .recordTemplate(status) + .auditStamp(statusChangeItem.getAuditStamp()) + .systemMetadata(statusChangeItem.getSystemMetadata()) + .build(mockAspectRetriever), + ChangeItemImpl.builder() + .urn( + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)")) + .aspectName(STATUS_ASPECT_NAME) + .changeType(changeType) + .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) + .aspectSpec( + TEST_REGISTRY + .getEntitySpec(SCHEMA_FIELD_ENTITY_NAME) + .getAspectSpec(STATUS_ASPECT_NAME)) + .recordTemplate(status) + .auditStamp(statusChangeItem.getAuditStamp()) + .systemMetadata(statusChangeItem.getSystemMetadata()) + .build(mockAspectRetriever))); + } + } + } + + // Case 2. status (exists), then schemaMetadata + reset(mockAspectRetriever); + when(mockAspectRetriever.getEntityRegistry()).thenReturn(TEST_REGISTRY); + when(mockAspectRetriever.getLatestAspectObjects(Set.of(TEST_URN), Set.of(STATUS_ASPECT_NAME))) + .thenReturn(Map.of(TEST_URN, Map.of(STATUS_ASPECT_NAME, new Aspect(status.data())))); + + for (ChangeType changeType : List.of(ChangeType.CREATE, ChangeType.UPSERT)) { + // Run test + ChangeItemImpl schemaMetadataChangeItem = + ChangeItemImpl.builder() + .urn(TEST_URN) + .aspectName(SCHEMA_METADATA_ASPECT_NAME) + .changeType(changeType) + .entitySpec(TEST_REGISTRY.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectSpec( + TEST_REGISTRY + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schemaMetadata) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(mockAspectRetriever); + testOutput = + test.applyMCPSideEffect(List.of(schemaMetadataChangeItem), retrieverContext).toList(); + + // Verify test + switch (changeType) { + default -> { + assertEquals( + testOutput.size(), 4, "Unexpected output items for changeType:" + changeType); + + assertEquals( + testOutput, + List.of( + ChangeItemImpl.builder() + .urn( + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)")) + .aspectName(SCHEMA_FIELD_KEY_ASPECT) + .changeType(changeType) + .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) + .aspectSpec( + TEST_REGISTRY + .getEntitySpec(SCHEMA_FIELD_ENTITY_NAME) + .getAspectSpec(SCHEMA_FIELD_KEY_ASPECT)) + .recordTemplate( + new SchemaFieldKey() + .setFieldPath("user_id") + .setParent(schemaMetadataChangeItem.getUrn())) + .auditStamp(schemaMetadataChangeItem.getAuditStamp()) + .systemMetadata(schemaMetadataChangeItem.getSystemMetadata()) + .build(mockAspectRetriever), + ChangeItemImpl.builder() + .urn( + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)")) + .aspectName(SCHEMA_FIELD_KEY_ASPECT) + .changeType(changeType) + .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) + .aspectSpec( + TEST_REGISTRY + .getEntitySpec(SCHEMA_FIELD_ENTITY_NAME) + .getAspectSpec(SCHEMA_FIELD_KEY_ASPECT)) + .recordTemplate( + new SchemaFieldKey() + .setFieldPath("user_name") + .setParent(schemaMetadataChangeItem.getUrn())) + .auditStamp(schemaMetadataChangeItem.getAuditStamp()) + .systemMetadata(schemaMetadataChangeItem.getSystemMetadata()) + .build(mockAspectRetriever), + ChangeItemImpl.builder() + .urn( + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)")) + .aspectName(STATUS_ASPECT_NAME) + .changeType(changeType) + .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) + .aspectSpec( + TEST_REGISTRY + .getEntitySpec(SCHEMA_FIELD_ENTITY_NAME) + .getAspectSpec(STATUS_ASPECT_NAME)) + .recordTemplate(status) + .auditStamp(schemaMetadataChangeItem.getAuditStamp()) + .systemMetadata(schemaMetadataChangeItem.getSystemMetadata()) + .build(mockAspectRetriever), + ChangeItemImpl.builder() + .urn( + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)")) + .aspectName(STATUS_ASPECT_NAME) + .changeType(changeType) + .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) + .aspectSpec( + TEST_REGISTRY + .getEntitySpec(SCHEMA_FIELD_ENTITY_NAME) + .getAspectSpec(STATUS_ASPECT_NAME)) + .recordTemplate(status) + .auditStamp(schemaMetadataChangeItem.getAuditStamp()) + .systemMetadata(schemaMetadataChangeItem.getSystemMetadata()) + .build(mockAspectRetriever))); + } + } + } + } + + @Test + public void schemaMetadataDeleteTest() { + SchemaFieldSideEffect test = new SchemaFieldSideEffect(); + test.setConfig(TEST_PLUGIN_CONFIG); + SchemaMetadata schemaMetadata = getTestSchemaMetadata(); + + // Run test + MCLItem schemaMetadataChangeItem = + MCLItemImpl.builder() + .metadataChangeLog( + new MetadataChangeLog() + .setChangeType(ChangeType.DELETE) + .setEntityUrn(TEST_URN) + .setEntityType(DATASET_ENTITY_NAME) + .setAspectName(SCHEMA_METADATA_ASPECT_NAME) + .setPreviousAspectValue(GenericRecordUtils.serializeAspect(schemaMetadata)) + .setCreated(AuditStampUtils.createDefaultAuditStamp())) + .build(retrieverContext.getAspectRetriever()); + + List testOutput = + test.postMCPSideEffect(List.of(schemaMetadataChangeItem), retrieverContext).toList(); + + List expectedEveryAspectPerField = new ArrayList<>(); + for (String schemaField : + List.of( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)")) { + for (AspectSpec aspectSpec : + TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME).getAspectSpecs()) { + expectedEveryAspectPerField.add( + DeleteItemImpl.builder() + .urn(UrnUtils.getUrn(schemaField)) + .aspectName(aspectSpec.getName()) + .auditStamp(schemaMetadataChangeItem.getAuditStamp()) + .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) + .aspectSpec(aspectSpec) + .build(retrieverContext.getAspectRetriever())); + } + } + + assertEquals( + testOutput.size(), + expectedEveryAspectPerField.size(), + "Unexpected output items for changeType:" + ChangeType.DELETE); + assertEquals( + testOutput.stream() + .filter(item -> item.getAspectName().equals(SCHEMA_FIELD_KEY_ASPECT)) + .count(), + 2, + "Expected both key aspects"); + + assertEquals(testOutput, expectedEveryAspectPerField); + } + + @Test + public void statusDeleteTest() { + SchemaFieldSideEffect test = new SchemaFieldSideEffect(); + test.setConfig(TEST_PLUGIN_CONFIG); + Status status = new Status().setRemoved(false); + + // mock response + reset(mockAspectRetriever); + when(mockAspectRetriever.getEntityRegistry()).thenReturn(TEST_REGISTRY); + when(mockAspectRetriever.getLatestAspectObjects( + Set.of(TEST_URN), Set.of(SCHEMA_METADATA_ASPECT_NAME))) + .thenReturn( + Map.of( + TEST_URN, + Map.of(SCHEMA_METADATA_ASPECT_NAME, new Aspect(getTestSchemaMetadata().data())))); + + // Run test + MCLItem statusChangeItem = + MCLItemImpl.builder() + .metadataChangeLog( + new MetadataChangeLog() + .setChangeType(ChangeType.DELETE) + .setEntityUrn(TEST_URN) + .setEntityType(DATASET_ENTITY_NAME) + .setAspectName(STATUS_ASPECT_NAME) + .setPreviousAspectValue(GenericRecordUtils.serializeAspect(status)) + .setCreated(AuditStampUtils.createDefaultAuditStamp())) + .build(retrieverContext.getAspectRetriever()); + + List testOutput = + test.postMCPSideEffect(List.of(statusChangeItem), retrieverContext).toList(); + + List expectedStatusDeletePerField = new ArrayList<>(); + for (String schemaField : + List.of( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)")) { + for (AspectSpec aspectSpec : + List.of( + TEST_REGISTRY + .getEntitySpec(SCHEMA_FIELD_ENTITY_NAME) + .getAspectSpec(STATUS_ASPECT_NAME))) { + expectedStatusDeletePerField.add( + DeleteItemImpl.builder() + .urn(UrnUtils.getUrn(schemaField)) + .aspectName(aspectSpec.getName()) + .auditStamp(statusChangeItem.getAuditStamp()) + .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) + .aspectSpec(aspectSpec) + .build(retrieverContext.getAspectRetriever())); + } + } + + assertEquals( + testOutput.size(), + expectedStatusDeletePerField.size(), + "Unexpected output items for changeType:" + ChangeType.DELETE); + assertEquals( + testOutput.stream().filter(item -> item.getAspectName().equals(STATUS_ASPECT_NAME)).count(), + 2, + "Expected both status aspects"); + assertEquals(testOutput, expectedStatusDeletePerField); + } + + private static SchemaMetadata getTestSchemaMetadata() { + String rawSchemaMetadataString = + "{\"foreignKeys\":[{\"name\":\"user id\",\"sourceFields\":[\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)\"],\"foreignFields\":[\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD),user_id)\"],\"foreignDataset\":\"urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD)\"}],\"platformSchema\":{\"com.linkedin.schema.KafkaSchema\":{\"documentSchemaType\":\"AVRO\",\"documentSchema\":\"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"SampleHiveSchema\\\",\\\"namespace\\\":\\\"com.linkedin.dataset\\\",\\\"doc\\\":\\\"Sample Hive dataset\\\",\\\"fields\\\":[{\\\"name\\\":\\\"field_foo\\\",\\\"type\\\":[\\\"string\\\"]},{\\\"name\\\":\\\"field_bar\\\",\\\"type\\\":[\\\"boolean\\\"]}]}\"}},\"created\":{\"actor\":\"urn:li:corpuser:jdoe\",\"time\":1674291843000},\"lastModified\":{\"actor\":\"urn:li:corpuser:jdoe\",\"time\":1674291843000},\"fields\":[{\"nullable\":false,\"fieldPath\":\"user_id\",\"description\":\"Id of the user created\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.BooleanType\":{}}},\"recursive\":false,\"nativeDataType\":\"varchar(100)\"},{\"nullable\":false,\"fieldPath\":\"user_name\",\"description\":\"Name of the user who signed up\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.BooleanType\":{}}},\"recursive\":false,\"nativeDataType\":\"boolean\"}],\"schemaName\":\"SampleHiveSchema\",\"version\":0,\"hash\":\"\",\"platform\":\"urn:li:dataPlatform:hive\"}"; + ByteString rawSchemaMetadataBytes = + ByteString.copyString(rawSchemaMetadataString, StandardCharsets.UTF_8); + return GenericRecordUtils.deserializeAspect( + rawSchemaMetadataBytes, "application/json", SchemaMetadata.class); + } +} diff --git a/metadata-models/src/main/resources/entity-registry.yml b/metadata-models/src/main/resources/entity-registry.yml index 6006ca179d162..56ee076003232 100644 --- a/metadata-models/src/main/resources/entity-registry.yml +++ b/metadata-models/src/main/resources/entity-registry.yml @@ -454,6 +454,7 @@ entities: - structuredProperties - forms - businessAttributes + - status - name: globalSettings doc: Global settings for an the platform category: internal @@ -644,6 +645,21 @@ plugins: aspectName: propertyDefinition - entityName: structuredProperty aspectName: structuredPropertyKey + - className: 'com.linkedin.metadata.schemafields.sideeffects.SchemaFieldSideEffect' + packageScan: + - 'com.linkedin.metadata.schemafields.sideeffects' + enabled: true + supportedOperations: + - CREATE + - CREATE_ENTITY + - UPSERT + - RESTATE + - DELETE + supportedEntityAspectNames: + - entityName: dataset + aspectName: status + - entityName: dataset + aspectName: schemaMetadata mutationHooks: - className: 'com.linkedin.metadata.structuredproperties.hooks.StructuredPropertiesSoftDelete' packageScan: diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/context/SystemOperationContextFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/context/SystemOperationContextFactory.java index 6870d10c87001..9d92468ce649f 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/context/SystemOperationContextFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/context/SystemOperationContextFactory.java @@ -17,7 +17,6 @@ import io.datahubproject.metadata.context.ServicesRegistryContext; import io.datahubproject.metadata.services.RestrictedService; import javax.annotation.Nonnull; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; @@ -26,10 +25,6 @@ @Configuration public class SystemOperationContextFactory { - @Autowired - @Qualifier("baseElasticSearchComponents") - private BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components; - /** * Used inside GMS * @@ -45,7 +40,9 @@ protected OperationContext javaSystemOperationContext( @Nonnull final EntityService entityService, @Nonnull final RestrictedService restrictedService, @Nonnull final GraphRetriever graphRetriever, - @Nonnull final SearchService searchService) { + @Nonnull final SearchService searchService, + @Qualifier("baseElasticSearchComponents") + BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components) { EntityServiceAspectRetriever entityServiceAspectRetriever = EntityServiceAspectRetriever.builder() @@ -91,7 +88,9 @@ protected OperationContext restliSystemOperationContext( @Nonnull final OperationContextConfig operationContextConfig, @Nonnull final RestrictedService restrictedService, @Nonnull final GraphRetriever graphRetriever, - @Nonnull final SearchService searchService) { + @Nonnull final SearchService searchService, + @Qualifier("baseElasticSearchComponents") + BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components) { EntityClientAspectRetriever entityServiceAspectRetriever = EntityClientAspectRetriever.builder().entityClient(systemEntityClient).build(); diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/restoreindices/RestoreIndicesArgs.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/restoreindices/RestoreIndicesArgs.java index 89e69174c1502..cae8c97456c06 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/restoreindices/RestoreIndicesArgs.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/restoreindices/RestoreIndicesArgs.java @@ -1,6 +1,8 @@ package com.linkedin.metadata.entity.restoreindices; import java.time.Instant; +import java.util.Collections; +import java.util.List; import lombok.Data; import lombok.experimental.Accessors; @@ -20,6 +22,7 @@ public class RestoreIndicesArgs implements Cloneable { public long gePitEpochMs = DEFAULT_GE_PIT_EPOCH_MS; public long lePitEpochMs; public String aspectName; + public List aspectNames = Collections.emptyList(); public String urn; public String urnLike; public Boolean urnBasedPagination = false;