From 54841e9eb3835bec078015843b842a1082733104 Mon Sep 17 00:00:00 2001 From: David Leifker Date: Wed, 7 Feb 2024 14:01:18 -0600 Subject: [PATCH] misc: datahub-upgrade improvements, aspect key & default aspects fixes --- .../upgrade/UpgradeCliApplication.java | 10 ++- .../config/BackfillBrowsePathsV2Config.java | 10 ++- .../ReindexDataJobViaNodesCLLConfig.java | 8 +- .../upgrade/config/SystemUpdateCondition.java | 14 ++++ .../upgrade/config/SystemUpdateConfig.java | 23 ++++++ .../entity/steps/BackfillBrowsePathsV2.java | 16 +++- .../steps/BackfillBrowsePathsV2Step.java | 66 +++++++++++----- .../system/via/ReindexDataJobViaNodesCLL.java | 9 ++- .../via/ReindexDataJobViaNodesCLLStep.java | 28 ++++--- .../DatahubUpgradeNoSchemaRegistryTest.java | 24 +++++- ...pgradeCliApplicationTestConfiguration.java | 17 ++++- .../com/linkedin/metadata/EventUtils.java | 2 +- .../metadata/entity/EntityServiceImpl.java | 39 +++++++--- .../metadata/entity/EntityServiceTest.java | 48 +++++++++++- .../src/main/resources/application.yml | 8 ++ .../factory/entity/EntityServiceFactory.java | 19 +---- .../DUHESchemaRegistryFactory.java | 40 ---------- .../InternalSchemaRegistryFactory.java | 12 --- .../SchemaRegistryServiceFactory.java | 20 +++++ .../SystemUpdateSchemaRegistryFactory.java | 66 ++++++++++++++++ .../linkedin/metadata/boot/BootstrapStep.java | 21 +---- .../boot/kafka/MockDUHESerializer.java | 57 -------------- ...java => MockSystemUpdateDeserializer.java} | 49 ++++++------ .../kafka/MockSystemUpdateSerializer.java | 76 +++++++++++++++++++ .../resources/entity/AspectResourceTest.java | 2 +- 25 files changed, 451 insertions(+), 233 deletions(-) create mode 100644 datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateCondition.java delete mode 100644 metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/DUHESchemaRegistryFactory.java create mode 100644 metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SchemaRegistryServiceFactory.java create mode 100644 metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SystemUpdateSchemaRegistryFactory.java delete mode 100644 metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHESerializer.java rename metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/{MockDUHEDeserializer.java => MockSystemUpdateDeserializer.java} (57%) create mode 100644 metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateSerializer.java diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java index ff8bd542fbdff..50847da07be73 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java @@ -2,6 +2,10 @@ import com.linkedin.gms.factory.auth.AuthorizerChainFactory; import com.linkedin.gms.factory.auth.DataHubAuthorizerFactory; +import com.linkedin.gms.factory.graphql.GraphQLEngineFactory; +import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory; +import com.linkedin.gms.factory.kafka.SimpleKafkaConsumerFactory; +import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory; import com.linkedin.gms.factory.telemetry.ScheduledAnalyticsFactory; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -24,7 +28,11 @@ classes = { ScheduledAnalyticsFactory.class, AuthorizerChainFactory.class, - DataHubAuthorizerFactory.class + DataHubAuthorizerFactory.class, + SimpleKafkaConsumerFactory.class, + KafkaEventConsumerFactory.class, + InternalSchemaRegistryFactory.class, + GraphQLEngineFactory.class }) }) public class UpgradeCliApplication { diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java index 406963c58fd71..2b2f4648f76e7 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java @@ -3,6 +3,7 @@ import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.search.SearchService; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -11,7 +12,12 @@ public class BackfillBrowsePathsV2Config { @Bean public BackfillBrowsePathsV2 backfillBrowsePathsV2( - EntityService entityService, SearchService searchService) { - return new BackfillBrowsePathsV2(entityService, searchService); + EntityService entityService, + SearchService searchService, + @Value("${systemUpdate.browsePathsV2.enabled}") final boolean enabled, + @Value("${systemUpdate.browsePathsV2.reprocess.enabled}") final boolean reprocessEnabled, + @Value("${systemUpdate.browsePathsV2.batchSize}") final Integer batchSize) { + return new BackfillBrowsePathsV2( + entityService, searchService, enabled, reprocessEnabled, batchSize); } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java index 06311e1853874..83dad80944f5f 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java @@ -2,6 +2,7 @@ import com.linkedin.datahub.upgrade.system.via.ReindexDataJobViaNodesCLL; import com.linkedin.metadata.entity.EntityService; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -9,7 +10,10 @@ public class ReindexDataJobViaNodesCLLConfig { @Bean - public ReindexDataJobViaNodesCLL _reindexDataJobViaNodesCLL(EntityService entityService) { - return new ReindexDataJobViaNodesCLL(entityService); + public ReindexDataJobViaNodesCLL _reindexDataJobViaNodesCLL( + EntityService entityService, + @Value("${systemUpdate.dataJobNodeCLL.enabled}") final boolean enabled, + @Value("${systemUpdate.dataJobNodeCLL.batchSize}") final Integer batchSize) { + return new ReindexDataJobViaNodesCLL(entityService, enabled, batchSize); } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateCondition.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateCondition.java new file mode 100644 index 0000000000000..ea432dfa9f7df --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateCondition.java @@ -0,0 +1,14 @@ +package com.linkedin.datahub.upgrade.config; + +import org.springframework.boot.ApplicationArguments; +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.core.type.AnnotatedTypeMetadata; + +public class SystemUpdateCondition implements Condition { + @Override + public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { + return context.getBeanFactory().getBean(ApplicationArguments.class).getNonOptionArgs().stream() + .anyMatch("SystemUpdate"::equals); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java index 177d4b531ba86..cde3a29248fb5 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java @@ -8,6 +8,7 @@ import com.linkedin.gms.factory.common.TopicConventionFactory; import com.linkedin.gms.factory.config.ConfigurationProvider; import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory; +import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory; import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig; import com.linkedin.metadata.config.kafka.KafkaConfiguration; import com.linkedin.metadata.dao.producer.KafkaEventProducer; @@ -21,9 +22,12 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; @Slf4j @Configuration @@ -74,4 +78,23 @@ protected KafkaEventProducer duheKafkaEventProducer( duheSchemaRegistryConfig, kafkaConfiguration, properties)); return new KafkaEventProducer(producer, topicConvention, kafkaHealthChecker); } + + /** + * The ReindexDataJobViaNodesCLLConfig step requires publishing to MCL. Overriding the default + * producer with this special producer which doesn't require an active registry. + * + *

Use when INTERNAL registry and is SYSTEM_UPDATE + * + *

This forces this producer into the EntityService + */ + @Primary + @Bean(name = "kafkaEventProducer") + @Conditional(SystemUpdateCondition.class) + @ConditionalOnProperty( + name = "kafka.schemaRegistry.type", + havingValue = InternalSchemaRegistryFactory.TYPE) + protected KafkaEventProducer kafkaEventProducer( + @Qualifier("duheKafkaEventProducer") KafkaEventProducer kafkaEventProducer) { + return kafkaEventProducer; + } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2.java index 4b9fc5bba0204..9b023e1e239a2 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2.java @@ -11,8 +11,20 @@ public class BackfillBrowsePathsV2 implements Upgrade { private final List _steps; - public BackfillBrowsePathsV2(EntityService entityService, SearchService searchService) { - _steps = ImmutableList.of(new BackfillBrowsePathsV2Step(entityService, searchService)); + public BackfillBrowsePathsV2( + EntityService entityService, + SearchService searchService, + boolean enabled, + boolean reprocessEnabled, + Integer batchSize) { + if (enabled) { + _steps = + ImmutableList.of( + new BackfillBrowsePathsV2Step( + entityService, searchService, reprocessEnabled, batchSize)); + } else { + _steps = ImmutableList.of(); + } } @Override diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java index 601ce4d25493c..2d64e0052ae82 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java @@ -16,6 +16,7 @@ import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.Constants; import com.linkedin.metadata.aspect.utils.DefaultAspectsUtil; +import com.linkedin.metadata.boot.BootstrapStep; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.query.SearchFlags; import com.linkedin.metadata.query.filter.Condition; @@ -37,9 +38,8 @@ @Slf4j public class BackfillBrowsePathsV2Step implements UpgradeStep { - public static final String BACKFILL_BROWSE_PATHS_V2 = "BACKFILL_BROWSE_PATHS_V2"; - public static final String REPROCESS_DEFAULT_BROWSE_PATHS_V2 = - "REPROCESS_DEFAULT_BROWSE_PATHS_V2"; + private static final String UPGRADE_ID = "BackfillBrowsePathsV2Step"; + private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID); public static final String DEFAULT_BROWSE_PATH_V2 = "␟Default"; private static final Set ENTITY_TYPES_TO_MIGRATE = @@ -53,14 +53,22 @@ public class BackfillBrowsePathsV2Step implements UpgradeStep { Constants.ML_MODEL_GROUP_ENTITY_NAME, Constants.ML_FEATURE_TABLE_ENTITY_NAME, Constants.ML_FEATURE_ENTITY_NAME); - private static final Integer BATCH_SIZE = 5000; - private final EntityService _entityService; - private final SearchService _searchService; - - public BackfillBrowsePathsV2Step(EntityService entityService, SearchService searchService) { - _searchService = searchService; - _entityService = entityService; + private final EntityService entityService; + private final SearchService searchService; + + private final boolean reprocessEnabled; + private final Integer batchSize; + + public BackfillBrowsePathsV2Step( + EntityService entityService, + SearchService searchService, + boolean reprocessEnabled, + Integer batchSize) { + this.searchService = searchService; + this.entityService = entityService; + this.reprocessEnabled = reprocessEnabled; + this.batchSize = batchSize; } @Override @@ -78,11 +86,14 @@ public Function executable() { log.info( String.format( "Upgrading batch %s-%s of browse paths for entity type %s", - migratedCount, migratedCount + BATCH_SIZE, entityType)); + migratedCount, migratedCount + batchSize, entityType)); scrollId = backfillBrowsePathsV2(entityType, auditStamp, scrollId); - migratedCount += BATCH_SIZE; + migratedCount += batchSize; } while (scrollId != null); } + + BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, entityService); + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED); }; } @@ -91,27 +102,27 @@ private String backfillBrowsePathsV2(String entityType, AuditStamp auditStamp, S final Filter filter; - if (System.getenv().containsKey(REPROCESS_DEFAULT_BROWSE_PATHS_V2) - && Boolean.parseBoolean(System.getenv(REPROCESS_DEFAULT_BROWSE_PATHS_V2))) { + if (reprocessEnabled) { filter = backfillDefaultBrowsePathsV2Filter(); } else { filter = backfillBrowsePathsV2Filter(); } final ScrollResult scrollResult = - _searchService.scrollAcrossEntities( + searchService.scrollAcrossEntities( ImmutableList.of(entityType), "*", filter, null, scrollId, null, - BATCH_SIZE, + batchSize, new SearchFlags() .setFulltext(true) .setSkipCache(true) .setSkipHighlighting(true) .setSkipAggregates(true)); + if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().size() == 0) { return null; } @@ -183,7 +194,7 @@ private Filter backfillDefaultBrowsePathsV2Filter() { private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exception { BrowsePathsV2 browsePathsV2 = - DefaultAspectsUtil.buildDefaultBrowsePathV2(urn, true, _entityService); + DefaultAspectsUtil.buildDefaultBrowsePathV2(urn, true, entityService); log.debug(String.format("Adding browse path v2 for urn %s with value %s", urn, browsePathsV2)); MetadataChangeProposal proposal = new MetadataChangeProposal(); proposal.setEntityUrn(urn); @@ -193,12 +204,12 @@ private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exceptio proposal.setSystemMetadata( new SystemMetadata().setRunId(DEFAULT_RUN_ID).setLastObserved(System.currentTimeMillis())); proposal.setAspect(GenericRecordUtils.serializeAspect(browsePathsV2)); - _entityService.ingestProposal(proposal, auditStamp, true); + entityService.ingestProposal(proposal, auditStamp, true); } @Override public String id() { - return "BackfillBrowsePathsV2Step"; + return UPGRADE_ID; } /** @@ -211,7 +222,22 @@ public boolean isOptional() { } @Override + /** + * Returns whether the upgrade should be skipped. Uses previous run history or the environment + * variables REPROCESS_DEFAULT_BROWSE_PATHS_V2 & BACKFILL_BROWSE_PATHS_V2 to determine whether to + * skip. + */ public boolean skip(UpgradeContext context) { - return !Boolean.parseBoolean(System.getenv(BACKFILL_BROWSE_PATHS_V2)); + boolean envEnabled = Boolean.parseBoolean(System.getenv("BACKFILL_BROWSE_PATHS_V2")); + + if (reprocessEnabled && envEnabled) { + return false; + } + + boolean previouslyRun = entityService.exists(UPGRADE_ID_URN, true); + if (previouslyRun) { + log.info("{} was already run. Skipping.", id()); + } + return (previouslyRun || !envEnabled); } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLL.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLL.java index 41179a50c4b54..59975693322d1 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLL.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLL.java @@ -18,8 +18,13 @@ public class ReindexDataJobViaNodesCLL implements Upgrade { private final List _steps; - public ReindexDataJobViaNodesCLL(EntityService entityService) { - _steps = ImmutableList.of(new ReindexDataJobViaNodesCLLStep(entityService)); + public ReindexDataJobViaNodesCLL( + EntityService entityService, boolean enabled, Integer batchSize) { + if (enabled) { + _steps = ImmutableList.of(new ReindexDataJobViaNodesCLLStep(entityService, batchSize)); + } else { + _steps = ImmutableList.of(); + } } @Override diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLLStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLLStep.java index 70afbc3d205b2..56166caf5b57e 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLLStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLLStep.java @@ -11,7 +11,6 @@ import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult; -import java.net.URISyntaxException; import java.util.function.Function; import lombok.extern.slf4j.Slf4j; @@ -21,12 +20,12 @@ public class ReindexDataJobViaNodesCLLStep implements UpgradeStep { private static final String UPGRADE_ID = "via-node-cll-reindex-datajob"; private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID); - private static final Integer BATCH_SIZE = 5000; + private final EntityService entityService; + private final Integer batchSize; - private final EntityService _entityService; - - public ReindexDataJobViaNodesCLLStep(EntityService entityService) { - _entityService = entityService; + public ReindexDataJobViaNodesCLLStep(EntityService entityService, Integer batchSize) { + this.entityService = entityService; + this.batchSize = batchSize; } @Override @@ -35,17 +34,16 @@ public Function executable() { RestoreIndicesArgs args = new RestoreIndicesArgs() .setAspectName(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME) - .setUrnLike("urn:li:" + DATA_JOB_ENTITY_NAME + ":%"); + .setUrnLike("urn:li:" + DATA_JOB_ENTITY_NAME + ":%") + .setBatchSize(batchSize); RestoreIndicesResult result = - _entityService.restoreIndices(args, x -> context.report().addLine((String) x)); + entityService.restoreIndices(args, x -> context.report().addLine((String) x)); context.report().addLine("Rows migrated: " + result.rowsMigrated); context.report().addLine("Rows ignored: " + result.ignored); - try { - BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, _entityService); - context.report().addLine("State updated: " + UPGRADE_ID_URN); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } + + BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, entityService); + context.report().addLine("State updated: " + UPGRADE_ID_URN); + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED); }; } @@ -70,7 +68,7 @@ public boolean isOptional() { * variable SKIP_REINDEX_DATA_JOB_INPUT_OUTPUT to determine whether to skip. */ public boolean skip(UpgradeContext context) { - boolean previouslyRun = _entityService.exists(UPGRADE_ID_URN, true); + boolean previouslyRun = entityService.exists(UPGRADE_ID_URN, true); boolean envFlagRecommendsSkip = Boolean.parseBoolean(System.getenv("SKIP_REINDEX_DATA_JOB_INPUT_OUTPUT")); if (previouslyRun) { diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java index 83b8e028727ce..4c9e12c0ed151 100644 --- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java @@ -4,6 +4,8 @@ import static org.testng.AssertJUnit.assertNotNull; import com.linkedin.datahub.upgrade.system.SystemUpdate; +import com.linkedin.metadata.dao.producer.KafkaEventProducer; +import com.linkedin.metadata.entity.EntityServiceImpl; import java.util.List; import java.util.Map; import java.util.Optional; @@ -19,19 +21,37 @@ classes = {UpgradeCliApplication.class, UpgradeCliApplicationTestConfiguration.class}, properties = { "kafka.schemaRegistry.type=INTERNAL", - "DATAHUB_UPGRADE_HISTORY_TOPIC_NAME=test_due_topic" - }) + "DATAHUB_UPGRADE_HISTORY_TOPIC_NAME=test_due_topic", + "METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME=test_mcl_versioned_topic" + }, + args = {"-u", "SystemUpdate"}) public class DatahubUpgradeNoSchemaRegistryTest extends AbstractTestNGSpringContextTests { @Autowired @Named("systemUpdate") private SystemUpdate systemUpdate; + @Autowired + @Named("kafkaEventProducer") + private KafkaEventProducer kafkaEventProducer; + + @Autowired + @Named("duheKafkaEventProducer") + private KafkaEventProducer duheKafkaEventProducer; + + @Autowired private EntityServiceImpl entityService; + @Test public void testSystemUpdateInit() { assertNotNull(systemUpdate); } + @Test + public void testSystemUpdateKafkaProducerOverride() { + assertEquals(kafkaEventProducer, duheKafkaEventProducer); + assertEquals(entityService.get_producer(), duheKafkaEventProducer); + } + @Test public void testSystemUpdateSend() { UpgradeStepResult.Result result = diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java index be28b7f739cf5..5c2d6fff0f07c 100644 --- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java @@ -1,15 +1,21 @@ package com.linkedin.datahub.upgrade; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import com.linkedin.gms.factory.auth.SystemAuthenticationFactory; -import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.graph.GraphService; import com.linkedin.metadata.models.registry.ConfigEntityRegistry; import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.registry.SchemaRegistryService; import com.linkedin.metadata.search.SearchService; import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders; import io.ebean.Database; +import java.util.Optional; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; @TestConfiguration @@ -20,8 +26,6 @@ public class UpgradeCliApplicationTestConfiguration { @MockBean private Database ebeanServer; - @MockBean private EntityService _entityService; - @MockBean private SearchService searchService; @MockBean private GraphService graphService; @@ -31,4 +35,11 @@ public class UpgradeCliApplicationTestConfiguration { @MockBean ConfigEntityRegistry configEntityRegistry; @MockBean public EntityIndexBuilders entityIndexBuilders; + + @Bean + public SchemaRegistryService schemaRegistryService() { + SchemaRegistryService mockService = mock(SchemaRegistryService.class); + when(mockService.getSchemaIdForTopic(anyString())).thenReturn(Optional.of(0)); + return mockService; + } } diff --git a/metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java b/metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java index 645c2fe210e09..adff32d5d336d 100644 --- a/metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java +++ b/metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java @@ -57,7 +57,7 @@ public class EventUtils { private static final Schema ORIGINAL_MCP_AVRO_SCHEMA = getAvroSchemaFromResource("avro/com/linkedin/mxe/MetadataChangeProposal.avsc"); - private static final Schema ORIGINAL_MCL_AVRO_SCHEMA = + public static final Schema ORIGINAL_MCL_AVRO_SCHEMA = getAvroSchemaFromResource("avro/com/linkedin/mxe/MetadataChangeLog.avsc"); private static final Schema ORIGINAL_FMCL_AVRO_SCHEMA = diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 7f15e3a7fd8fc..eec5c6120886d 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -15,6 +15,7 @@ import com.codahale.metrics.Timer; import com.datahub.util.RecordUtils; import com.datahub.util.exception.ModelConversionException; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterators; @@ -146,7 +147,8 @@ public class EntityServiceImpl implements EntityService { private static final int DEFAULT_MAX_TRANSACTION_RETRY = 3; protected final AspectDao _aspectDao; - private final EventProducer _producer; + + @VisibleForTesting @Getter private final EventProducer _producer; private final EntityRegistry _entityRegistry; private final Map> _entityToValidAspects; private RetentionService _retentionService; @@ -637,10 +639,15 @@ public List ingestAspects( @Override public List ingestAspects( @Nonnull final AspectsBatch aspectsBatch, boolean emitMCL, boolean overwrite) { + Set items = new HashSet<>(aspectsBatch.getItems()); + + // Generate additional items as needed + items.addAll(DefaultAspectsUtil.getAdditionalChanges(aspectsBatch, this, enableBrowseV2)); + AspectsBatch withDefaults = AspectsBatchImpl.builder().items(items).build(); Timer.Context ingestToLocalDBTimer = MetricUtils.timer(this.getClass(), "ingestAspectsToLocalDB").time(); - List ingestResults = ingestAspectsToLocalDB(aspectsBatch, overwrite); + List ingestResults = ingestAspectsToLocalDB(withDefaults, overwrite); List mclResults = emitMCL(ingestResults, emitMCL); ingestToLocalDBTimer.stop(); @@ -964,7 +971,7 @@ public IngestResult ingestProposal( */ @Override public Set ingestProposal(AspectsBatch aspectsBatch, final boolean async) { - Stream timeseriesIngestResults = ingestTimeseriesProposal(aspectsBatch); + Stream timeseriesIngestResults = ingestTimeseriesProposal(aspectsBatch, async); Stream nonTimeseriesIngestResults = async ? ingestProposalAsync(aspectsBatch) : ingestProposalSync(aspectsBatch); @@ -978,7 +985,8 @@ public Set ingestProposal(AspectsBatch aspectsBatch, final boolean * @param aspectsBatch timeseries upserts batch * @return returns ingest proposal result, however was never in the MCP topic */ - private Stream ingestTimeseriesProposal(AspectsBatch aspectsBatch) { + private Stream ingestTimeseriesProposal( + AspectsBatch aspectsBatch, final boolean async) { List unsupported = aspectsBatch.getItems().stream() .filter( @@ -992,6 +1000,20 @@ private Stream ingestTimeseriesProposal(AspectsBatch aspectsBatch) + unsupported.stream().map(BatchItem::getChangeType).collect(Collectors.toSet())); } + if (!async) { + // Create default non-timeseries aspects for timeseries aspects + List timeseriesItems = + aspectsBatch.getItems().stream() + .filter(item -> item.getAspectSpec().isTimeseries()) + .collect(Collectors.toList()); + + List defaultAspects = + DefaultAspectsUtil.getAdditionalChanges( + AspectsBatchImpl.builder().items(timeseriesItems).build(), this, enableBrowseV2); + ingestProposalSync(AspectsBatchImpl.builder().items(defaultAspects).build()); + } + + // Emit timeseries MCLs List, Boolean>>>> timeseriesResults = aspectsBatch.getItems().stream() .filter(item -> item.getAspectSpec().isTimeseries()) @@ -1080,17 +1102,10 @@ private Stream ingestProposalAsync(AspectsBatch aspectsBatch) { } private Stream ingestProposalSync(AspectsBatch aspectsBatch) { - Set items = new HashSet<>(aspectsBatch.getItems()); - - // Generate additional items as needed - items.addAll(DefaultAspectsUtil.getAdditionalChanges(aspectsBatch, this, enableBrowseV2)); - - AspectsBatch withDefaults = AspectsBatchImpl.builder().items(items).build(); - AspectsBatchImpl nonTimeseries = AspectsBatchImpl.builder() .items( - withDefaults.getItems().stream() + aspectsBatch.getItems().stream() .filter(item -> !item.getAspectSpec().isTimeseries()) .collect(Collectors.toList())) .build(); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java index ea4e97d264bca..384b54c7a1c8d 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java @@ -479,7 +479,7 @@ public void testIngestAspectsGetLatestAspects() throws Exception { assertTrue(DataTemplateUtil.areEqual(writeAspect1, latestAspects.get(aspectName1))); assertTrue(DataTemplateUtil.areEqual(writeAspect2, latestAspects.get(aspectName2))); - verify(_mockProducer, times(2)) + verify(_mockProducer, times(3)) .produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), Mockito.any()); verifyNoMoreInteractions(_mockProducer); @@ -772,6 +772,12 @@ public void testUpdateGetAspect() throws AssertionError { .produceMetadataChangeLog( Mockito.eq(entityUrn), Mockito.eq(corpUserInfoSpec), Mockito.any()); + verify(_mockProducer, times(1)) + .produceMetadataChangeLog( + Mockito.eq(entityUrn), + Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserKey")), + Mockito.any()); + verifyNoMoreInteractions(_mockProducer); } @@ -824,6 +830,13 @@ public void testGetAspectAtVersion() throws AssertionError { readAspect1 = _entityServiceImpl.getVersionedAspect(entityUrn, aspectName, -1); assertFalse(DataTemplateUtil.areEqual(writtenVersionedAspect1, readAspect1)); + // check key aspect + verify(_mockProducer, times(1)) + .produceMetadataChangeLog( + Mockito.eq(entityUrn), + Mockito.eq(_testEntityRegistry.getEntitySpec("corpuser").getAspectSpec("corpUserKey")), + Mockito.any()); + verifyNoMoreInteractions(_mockProducer); } @@ -1094,13 +1107,22 @@ public void testIngestGetLatestAspect() throws AssertionError { ArgumentCaptor mclCaptor = ArgumentCaptor.forClass(MetadataChangeLog.class); verify(_mockProducer, times(1)) - .produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), mclCaptor.capture()); + .produceMetadataChangeLog( + Mockito.eq(entityUrn), + Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserInfo")), + mclCaptor.capture()); MetadataChangeLog mcl = mclCaptor.getValue(); assertEquals(mcl.getEntityType(), "corpuser"); assertNull(mcl.getPreviousAspectValue()); assertNull(mcl.getPreviousSystemMetadata()); assertEquals(mcl.getChangeType(), ChangeType.UPSERT); + verify(_mockProducer, times(1)) + .produceMetadataChangeLog( + Mockito.eq(entityUrn), + Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserKey")), + Mockito.any()); + verifyNoMoreInteractions(_mockProducer); reset(_mockProducer); @@ -1201,7 +1223,16 @@ public void testIngestGetLatestEnvelopedAspect() throws Exception { EntityUtils.parseSystemMetadata(readAspectDao1.getSystemMetadata()), metadata1)); verify(_mockProducer, times(2)) - .produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), Mockito.any()); + .produceMetadataChangeLog( + Mockito.eq(entityUrn), + Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserInfo")), + Mockito.any()); + + verify(_mockProducer, times(1)) + .produceMetadataChangeLog( + Mockito.eq(entityUrn), + Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserKey")), + Mockito.any()); verifyNoMoreInteractions(_mockProducer); } @@ -1234,9 +1265,18 @@ public void testIngestSameAspect() throws AssertionError { RecordTemplate readAspect1 = _entityServiceImpl.getLatestAspect(entityUrn, aspectName); assertTrue(DataTemplateUtil.areEqual(writeAspect1, readAspect1)); + verify(_mockProducer, times(1)) + .produceMetadataChangeLog( + Mockito.eq(entityUrn), + Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserKey")), + Mockito.any()); + ArgumentCaptor mclCaptor = ArgumentCaptor.forClass(MetadataChangeLog.class); verify(_mockProducer, times(1)) - .produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), mclCaptor.capture()); + .produceMetadataChangeLog( + Mockito.eq(entityUrn), + Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserInfo")), + mclCaptor.capture()); MetadataChangeLog mcl = mclCaptor.getValue(); assertEquals(mcl.getEntityType(), "corpuser"); assertNull(mcl.getPreviousAspectValue()); diff --git a/metadata-service/configuration/src/main/resources/application.yml b/metadata-service/configuration/src/main/resources/application.yml index d4c11d4aa53bd..c2a0d508b57d6 100644 --- a/metadata-service/configuration/src/main/resources/application.yml +++ b/metadata-service/configuration/src/main/resources/application.yml @@ -314,6 +314,14 @@ systemUpdate: maxBackOffs: ${BOOTSTRAP_SYSTEM_UPDATE_MAX_BACK_OFFS:50} backOffFactor: ${BOOTSTRAP_SYSTEM_UPDATE_BACK_OFF_FACTOR:2} # Multiplicative factor for back off, default values will result in waiting 5min 15s waitForSystemUpdate: ${BOOTSTRAP_SYSTEM_UPDATE_WAIT_FOR_SYSTEM_UPDATE:true} + dataJobNodeCLL: + enabled: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_ENABLED:true} + batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_BATCH_SIZE:200} + browsePathsV2: + enabled: ${BOOTSTRAP_SYSTEM_UPDATE_BROWSE_PATHS_V2_ENABLED:true} + batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_BROWSE_PATHS_V2_BATCH_SIZE:5000} + reprocess: + enabled: ${REPROCESS_DEFAULT_BROWSE_PATHS_V2:false} structuredProperties: enabled: ${ENABLE_STRUCTURED_PROPERTIES_HOOK:true} # applies structured properties mappings diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java index 871f16d97be33..2ccdee5fb1dbf 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java @@ -1,20 +1,15 @@ package com.linkedin.gms.factory.entity; import com.linkedin.datahub.graphql.featureflags.FeatureFlags; -import com.linkedin.gms.factory.common.TopicConventionFactory; import com.linkedin.gms.factory.config.ConfigurationProvider; import com.linkedin.metadata.dao.producer.KafkaEventProducer; -import com.linkedin.metadata.dao.producer.KafkaHealthChecker; import com.linkedin.metadata.entity.AspectDao; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.EntityServiceImpl; import com.linkedin.metadata.entity.ebean.batch.MCPUpsertBatchItem; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.service.UpdateIndicesService; -import com.linkedin.mxe.TopicConvention; import javax.annotation.Nonnull; -import org.apache.avro.generic.IndexedRecord; -import org.apache.kafka.clients.producer.Producer; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; @@ -28,26 +23,16 @@ public class EntityServiceFactory { private Integer _ebeanMaxTransactionRetry; @Bean(name = "entityService") - @DependsOn({ - "entityAspectDao", - "kafkaEventProducer", - "kafkaHealthChecker", - TopicConventionFactory.TOPIC_CONVENTION_BEAN, - "entityRegistry" - }) + @DependsOn({"entityAspectDao", "kafkaEventProducer", "entityRegistry"}) @Nonnull protected EntityService createInstance( - Producer producer, - TopicConvention convention, - KafkaHealthChecker kafkaHealthChecker, + @Qualifier("kafkaEventProducer") final KafkaEventProducer eventProducer, @Qualifier("entityAspectDao") AspectDao aspectDao, EntityRegistry entityRegistry, ConfigurationProvider configurationProvider, UpdateIndicesService updateIndicesService, @Value("${featureFlags.showBrowseV2}") final boolean enableBrowsePathV2) { - final KafkaEventProducer eventProducer = - new KafkaEventProducer(producer, convention, kafkaHealthChecker); FeatureFlags featureFlags = configurationProvider.getFeatureFlags(); return new EntityServiceImpl( diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/DUHESchemaRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/DUHESchemaRegistryFactory.java deleted file mode 100644 index 4819984307af9..0000000000000 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/DUHESchemaRegistryFactory.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.linkedin.gms.factory.kafka.schemaregistry; - -import static com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener.TOPIC_NAME; - -import com.linkedin.gms.factory.config.ConfigurationProvider; -import com.linkedin.metadata.boot.kafka.MockDUHEDeserializer; -import com.linkedin.metadata.boot.kafka.MockDUHESerializer; -import com.linkedin.metadata.config.kafka.KafkaConfiguration; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; -import java.util.HashMap; -import java.util.Map; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Slf4j -@Configuration -public class DUHESchemaRegistryFactory { - - public static final String DUHE_SCHEMA_REGISTRY_TOPIC_KEY = "duheTopicName"; - - @Value(TOPIC_NAME) - private String duheTopicName; - - /** Configure Kafka Producer/Consumer processes with a custom schema registry. */ - @Bean("duheSchemaRegistryConfig") - protected SchemaRegistryConfig duheSchemaRegistryConfig(ConfigurationProvider provider) { - Map props = new HashMap<>(); - KafkaConfiguration kafkaConfiguration = provider.getKafka(); - - props.put( - AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - kafkaConfiguration.getSchemaRegistry().getUrl()); - props.put(DUHE_SCHEMA_REGISTRY_TOPIC_KEY, duheTopicName); - - log.info("DataHub System Update Registry"); - return new SchemaRegistryConfig(MockDUHESerializer.class, MockDUHEDeserializer.class, props); - } -} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/InternalSchemaRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/InternalSchemaRegistryFactory.java index 8c814e5054758..46b27195ecc67 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/InternalSchemaRegistryFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/InternalSchemaRegistryFactory.java @@ -1,11 +1,7 @@ package com.linkedin.gms.factory.kafka.schemaregistry; -import com.linkedin.gms.factory.common.TopicConventionFactory; import com.linkedin.gms.factory.config.ConfigurationProvider; import com.linkedin.metadata.config.kafka.KafkaConfiguration; -import com.linkedin.metadata.registry.SchemaRegistryService; -import com.linkedin.metadata.registry.SchemaRegistryServiceImpl; -import com.linkedin.mxe.TopicConvention; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; @@ -17,7 +13,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.DependsOn; @Slf4j @Configuration @@ -45,11 +40,4 @@ protected SchemaRegistryConfig getInstance( kafkaConfiguration.getSchemaRegistry().getUrl()); return new SchemaRegistryConfig(KafkaAvroSerializer.class, KafkaAvroDeserializer.class, props); } - - @Bean(name = "schemaRegistryService") - @Nonnull - @DependsOn({TopicConventionFactory.TOPIC_CONVENTION_BEAN}) - protected SchemaRegistryService schemaRegistryService(TopicConvention convention) { - return new SchemaRegistryServiceImpl(convention); - } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SchemaRegistryServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SchemaRegistryServiceFactory.java new file mode 100644 index 0000000000000..a6869321d796f --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SchemaRegistryServiceFactory.java @@ -0,0 +1,20 @@ +package com.linkedin.gms.factory.kafka.schemaregistry; + +import com.linkedin.gms.factory.common.TopicConventionFactory; +import com.linkedin.metadata.registry.SchemaRegistryService; +import com.linkedin.metadata.registry.SchemaRegistryServiceImpl; +import com.linkedin.mxe.TopicConvention; +import javax.annotation.Nonnull; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; + +@Configuration +public class SchemaRegistryServiceFactory { + @Bean(name = "schemaRegistryService") + @Nonnull + @DependsOn({TopicConventionFactory.TOPIC_CONVENTION_BEAN}) + protected SchemaRegistryService schemaRegistryService(TopicConvention convention) { + return new SchemaRegistryServiceImpl(convention); + } +} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SystemUpdateSchemaRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SystemUpdateSchemaRegistryFactory.java new file mode 100644 index 0000000000000..d02cdc0e68f52 --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SystemUpdateSchemaRegistryFactory.java @@ -0,0 +1,66 @@ +package com.linkedin.gms.factory.kafka.schemaregistry; + +import static com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener.TOPIC_NAME; + +import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.metadata.boot.kafka.MockSystemUpdateDeserializer; +import com.linkedin.metadata.boot.kafka.MockSystemUpdateSerializer; +import com.linkedin.metadata.config.kafka.KafkaConfiguration; +import com.linkedin.metadata.registry.SchemaRegistryService; +import com.linkedin.mxe.Topics; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import java.util.HashMap; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Slf4j +@Configuration +public class SystemUpdateSchemaRegistryFactory { + + public static final String SYSTEM_UPDATE_TOPIC_KEY_PREFIX = "data-hub.system-update.topic-key."; + public static final String SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX = ".id"; + + public static final String DUHE_SCHEMA_REGISTRY_TOPIC_KEY = + SYSTEM_UPDATE_TOPIC_KEY_PREFIX + "duhe"; + public static final String MCL_VERSIONED_SCHEMA_REGISTRY_TOPIC_KEY = + SYSTEM_UPDATE_TOPIC_KEY_PREFIX + "mcl-versioned"; + + @Value(TOPIC_NAME) + private String duheTopicName; + + @Value("${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_VERSIONED + "}") + private String mclTopicName; + + /** Configure Kafka Producer/Consumer processes with a custom schema registry. */ + @Bean("duheSchemaRegistryConfig") + protected SchemaRegistryConfig duheSchemaRegistryConfig( + final ConfigurationProvider provider, final SchemaRegistryService schemaRegistryService) { + Map props = new HashMap<>(); + KafkaConfiguration kafkaConfiguration = provider.getKafka(); + + props.put( + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + kafkaConfiguration.getSchemaRegistry().getUrl()); + + // topic names + props.putAll( + Map.of( + DUHE_SCHEMA_REGISTRY_TOPIC_KEY, duheTopicName, + MCL_VERSIONED_SCHEMA_REGISTRY_TOPIC_KEY, mclTopicName)); + + // topic ordinals + props.putAll( + Map.of( + DUHE_SCHEMA_REGISTRY_TOPIC_KEY + SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX, + schemaRegistryService.getSchemaIdForTopic(duheTopicName).get().toString(), + MCL_VERSIONED_SCHEMA_REGISTRY_TOPIC_KEY + SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX, + schemaRegistryService.getSchemaIdForTopic(mclTopicName).get().toString())); + + log.info("DataHub System Update Registry"); + return new SchemaRegistryConfig( + MockSystemUpdateSerializer.class, MockSystemUpdateDeserializer.class, props); + } +} diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/BootstrapStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/BootstrapStep.java index a79bdacfc55e9..2dccda4243bca 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/BootstrapStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/BootstrapStep.java @@ -1,16 +1,15 @@ package com.linkedin.metadata.boot; -import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.key.DataHubUpgradeKey; +import com.linkedin.metadata.utils.AuditStampUtils; import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.upgrade.DataHubUpgradeResult; -import java.net.URISyntaxException; import javax.annotation.Nonnull; /** A single step in the Bootstrap process. */ @@ -40,24 +39,10 @@ static Urn getUpgradeUrn(String upgradeId) { new DataHubUpgradeKey().setId(upgradeId), Constants.DATA_HUB_UPGRADE_ENTITY_NAME); } - static void setUpgradeResult(Urn urn, EntityService entityService) throws URISyntaxException { - final AuditStamp auditStamp = - new AuditStamp() - .setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)) - .setTime(System.currentTimeMillis()); + static void setUpgradeResult(Urn urn, EntityService entityService) { final DataHubUpgradeResult upgradeResult = new DataHubUpgradeResult().setTimestampMs(System.currentTimeMillis()); - // Workaround because entity service does not auto-generate the key aspect for us - final MetadataChangeProposal keyProposal = new MetadataChangeProposal(); - final DataHubUpgradeKey upgradeKey = new DataHubUpgradeKey().setId(urn.getId()); - keyProposal.setEntityUrn(urn); - keyProposal.setEntityType(Constants.DATA_HUB_UPGRADE_ENTITY_NAME); - keyProposal.setAspectName(Constants.DATA_HUB_UPGRADE_KEY_ASPECT_NAME); - keyProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeKey)); - keyProposal.setChangeType(ChangeType.UPSERT); - entityService.ingestProposal(keyProposal, auditStamp, false); - // Ingest the upgrade result final MetadataChangeProposal upgradeProposal = new MetadataChangeProposal(); upgradeProposal.setEntityUrn(urn); @@ -65,6 +50,6 @@ static void setUpgradeResult(Urn urn, EntityService entityService) throws URI upgradeProposal.setAspectName(Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME); upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeResult)); upgradeProposal.setChangeType(ChangeType.UPSERT); - entityService.ingestProposal(upgradeProposal, auditStamp, false); + entityService.ingestProposal(upgradeProposal, AuditStampUtils.createDefaultAuditStamp(), false); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHESerializer.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHESerializer.java deleted file mode 100644 index 36fe514d5536f..0000000000000 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHESerializer.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.linkedin.metadata.boot.kafka; - -import static com.linkedin.gms.factory.kafka.schemaregistry.DUHESchemaRegistryFactory.DUHE_SCHEMA_REGISTRY_TOPIC_KEY; - -import com.linkedin.metadata.EventUtils; -import io.confluent.kafka.schemaregistry.avro.AvroSchema; -import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import io.confluent.kafka.serializers.KafkaAvroSerializer; -import java.io.IOException; -import java.util.Map; -import lombok.extern.slf4j.Slf4j; - -/** Used for early bootstrap to avoid contact with not yet existing schema registry */ -@Slf4j -public class MockDUHESerializer extends KafkaAvroSerializer { - - private static final String DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT_SUFFIX = "-value"; - - private String topicName; - - public MockDUHESerializer() { - this.schemaRegistry = buildMockSchemaRegistryClient(); - } - - public MockDUHESerializer(SchemaRegistryClient client) { - super(client); - this.schemaRegistry = buildMockSchemaRegistryClient(); - } - - public MockDUHESerializer(SchemaRegistryClient client, Map props) { - super(client, props); - this.schemaRegistry = buildMockSchemaRegistryClient(); - } - - @Override - public void configure(Map configs, boolean isKey) { - super.configure(configs, isKey); - topicName = configs.get(DUHE_SCHEMA_REGISTRY_TOPIC_KEY).toString(); - } - - private MockSchemaRegistryClient buildMockSchemaRegistryClient() { - MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient(); - try { - schemaRegistry.register( - topicToSubjectName(topicName), new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA)); - return schemaRegistry; - } catch (IOException | RestClientException e) { - throw new RuntimeException(e); - } - } - - public static String topicToSubjectName(String topicName) { - return topicName + DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT_SUFFIX; - } -} diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHEDeserializer.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateDeserializer.java similarity index 57% rename from metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHEDeserializer.java rename to metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateDeserializer.java index e631f776abd08..74a20cdacbb21 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHEDeserializer.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateDeserializer.java @@ -1,50 +1,49 @@ package com.linkedin.metadata.boot.kafka; -import static com.linkedin.gms.factory.kafka.schemaregistry.DUHESchemaRegistryFactory.DUHE_SCHEMA_REGISTRY_TOPIC_KEY; -import static com.linkedin.metadata.boot.kafka.MockDUHESerializer.topicToSubjectName; +import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.DUHE_SCHEMA_REGISTRY_TOPIC_KEY; +import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX; +import static com.linkedin.metadata.boot.kafka.MockSystemUpdateSerializer.topicToSubjectName; import com.linkedin.metadata.EventUtils; import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.io.IOException; import java.util.Map; import lombok.extern.slf4j.Slf4j; -/** Used for early bootstrap to avoid contact with not yet existing schema registry */ +/** + * Used for early bootstrap to avoid contact with not yet existing schema registry Only supports the + * DUHE topic + */ @Slf4j -public class MockDUHEDeserializer extends KafkaAvroDeserializer { +public class MockSystemUpdateDeserializer extends KafkaAvroDeserializer { private String topicName; - - public MockDUHEDeserializer() { - this.schemaRegistry = buildMockSchemaRegistryClient(); - } - - public MockDUHEDeserializer(SchemaRegistryClient client) { - super(client); - this.schemaRegistry = buildMockSchemaRegistryClient(); - } - - public MockDUHEDeserializer(SchemaRegistryClient client, Map props) { - super(client, props); - this.schemaRegistry = buildMockSchemaRegistryClient(); - } + private Integer schemaId; @Override public void configure(Map configs, boolean isKey) { super.configure(configs, isKey); topicName = configs.get(DUHE_SCHEMA_REGISTRY_TOPIC_KEY).toString(); + schemaId = + Integer.valueOf( + configs + .get(DUHE_SCHEMA_REGISTRY_TOPIC_KEY + SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX) + .toString()); + this.schemaRegistry = buildMockSchemaRegistryClient(); } private MockSchemaRegistryClient buildMockSchemaRegistryClient() { - MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient2(); + MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient2(schemaId); try { schemaRegistry.register( - topicToSubjectName(topicName), new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA)); + topicToSubjectName(topicName), + new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA), + 0, + schemaId); return schemaRegistry; } catch (IOException | RestClientException e) { throw new RuntimeException(e); @@ -52,13 +51,19 @@ private MockSchemaRegistryClient buildMockSchemaRegistryClient() { } public static class MockSchemaRegistryClient2 extends MockSchemaRegistryClient { + private final int schemaId; + + public MockSchemaRegistryClient2(int schemaId) { + this.schemaId = schemaId; + } + /** * Previously used topics can have schema ids > 1 which fully match however we are replacing * that registry so force schema id to 1 */ @Override public synchronized ParsedSchema getSchemaById(int id) throws IOException, RestClientException { - return super.getSchemaById(1); + return super.getSchemaById(schemaId); } } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateSerializer.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateSerializer.java new file mode 100644 index 0000000000000..14aac2758a69d --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateSerializer.java @@ -0,0 +1,76 @@ +package com.linkedin.metadata.boot.kafka; + +import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.DUHE_SCHEMA_REGISTRY_TOPIC_KEY; +import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.MCL_VERSIONED_SCHEMA_REGISTRY_TOPIC_KEY; +import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX; +import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.SYSTEM_UPDATE_TOPIC_KEY_PREFIX; + +import com.linkedin.metadata.EventUtils; +import com.linkedin.util.Pair; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import java.io.IOException; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + +/** Used for early bootstrap to avoid contact with not yet existing schema registry */ +@Slf4j +public class MockSystemUpdateSerializer extends KafkaAvroSerializer { + + private static final String DATAHUB_SYSTEM_UPDATE_SUBJECT_SUFFIX = "-value"; + + private static final Map AVRO_SCHEMA_MAP = + Map.of( + DUHE_SCHEMA_REGISTRY_TOPIC_KEY, new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA), + MCL_VERSIONED_SCHEMA_REGISTRY_TOPIC_KEY, + new AvroSchema(EventUtils.ORIGINAL_MCL_AVRO_SCHEMA)); + + private Map> topicNameToAvroSchemaMap; + + @Override + public void configure(Map configs, boolean isKey) { + super.configure(configs, isKey); + topicNameToAvroSchemaMap = + configs.entrySet().stream() + .filter( + e -> + e.getKey().startsWith(SYSTEM_UPDATE_TOPIC_KEY_PREFIX) + && !e.getKey().endsWith(SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX) + && e.getValue() instanceof String) + .map( + e -> { + Integer id = + Integer.valueOf( + (String) configs.get(e.getKey() + SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX)); + return Pair.of( + (String) e.getValue(), Pair.of(AVRO_SCHEMA_MAP.get(e.getKey()), id)); + }) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + this.schemaRegistry = buildMockSchemaRegistryClient(); + } + + private MockSchemaRegistryClient buildMockSchemaRegistryClient() { + MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient(); + + if (topicNameToAvroSchemaMap != null) { + topicNameToAvroSchemaMap.forEach( + (topicName, schemaId) -> { + try { + schemaRegistry.register( + topicToSubjectName(topicName), schemaId.getFirst(), 0, schemaId.getSecond()); + } catch (IOException | RestClientException e) { + throw new RuntimeException(e); + } + }); + } + + return schemaRegistry; + } + + public static String topicToSubjectName(String topicName) { + return topicName + DATAHUB_SYSTEM_UPDATE_SUBJECT_SUFFIX; + } +} diff --git a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java index 1678fe92ec70e..17c5160494722 100644 --- a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java +++ b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java @@ -122,7 +122,7 @@ public void testAsyncDefaultAspects() throws URISyntaxException { .request(req) .build()))); _aspectResource.ingestProposal(mcp, "false"); - verify(_producer, times(5)) + verify(_producer, times(10)) .produceMetadataChangeLog(eq(urn), any(AspectSpec.class), any(MetadataChangeLog.class)); verifyNoMoreInteractions(_producer); }