diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java
index ff8bd542fbdff..50847da07be73 100644
--- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java
@@ -2,6 +2,10 @@
import com.linkedin.gms.factory.auth.AuthorizerChainFactory;
import com.linkedin.gms.factory.auth.DataHubAuthorizerFactory;
+import com.linkedin.gms.factory.graphql.GraphQLEngineFactory;
+import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory;
+import com.linkedin.gms.factory.kafka.SimpleKafkaConsumerFactory;
+import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory;
import com.linkedin.gms.factory.telemetry.ScheduledAnalyticsFactory;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -24,7 +28,11 @@
classes = {
ScheduledAnalyticsFactory.class,
AuthorizerChainFactory.class,
- DataHubAuthorizerFactory.class
+ DataHubAuthorizerFactory.class,
+ SimpleKafkaConsumerFactory.class,
+ KafkaEventConsumerFactory.class,
+ InternalSchemaRegistryFactory.class,
+ GraphQLEngineFactory.class
})
})
public class UpgradeCliApplication {
diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java
index 406963c58fd71..2b2f4648f76e7 100644
--- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java
@@ -3,6 +3,7 @@
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.SearchService;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -11,7 +12,12 @@ public class BackfillBrowsePathsV2Config {
@Bean
public BackfillBrowsePathsV2 backfillBrowsePathsV2(
- EntityService> entityService, SearchService searchService) {
- return new BackfillBrowsePathsV2(entityService, searchService);
+ EntityService> entityService,
+ SearchService searchService,
+ @Value("${systemUpdate.browsePathsV2.enabled}") final boolean enabled,
+ @Value("${systemUpdate.browsePathsV2.reprocess.enabled}") final boolean reprocessEnabled,
+ @Value("${systemUpdate.browsePathsV2.batchSize}") final Integer batchSize) {
+ return new BackfillBrowsePathsV2(
+ entityService, searchService, enabled, reprocessEnabled, batchSize);
}
}
diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java
index 06311e1853874..83dad80944f5f 100644
--- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java
@@ -2,6 +2,7 @@
import com.linkedin.datahub.upgrade.system.via.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.entity.EntityService;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -9,7 +10,10 @@
public class ReindexDataJobViaNodesCLLConfig {
@Bean
- public ReindexDataJobViaNodesCLL _reindexDataJobViaNodesCLL(EntityService> entityService) {
- return new ReindexDataJobViaNodesCLL(entityService);
+ public ReindexDataJobViaNodesCLL _reindexDataJobViaNodesCLL(
+ EntityService> entityService,
+ @Value("${systemUpdate.dataJobNodeCLL.enabled}") final boolean enabled,
+ @Value("${systemUpdate.dataJobNodeCLL.batchSize}") final Integer batchSize) {
+ return new ReindexDataJobViaNodesCLL(entityService, enabled, batchSize);
}
}
diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateCondition.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateCondition.java
new file mode 100644
index 0000000000000..ea432dfa9f7df
--- /dev/null
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateCondition.java
@@ -0,0 +1,14 @@
+package com.linkedin.datahub.upgrade.config;
+
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.context.annotation.Condition;
+import org.springframework.context.annotation.ConditionContext;
+import org.springframework.core.type.AnnotatedTypeMetadata;
+
+public class SystemUpdateCondition implements Condition {
+ @Override
+ public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
+ return context.getBeanFactory().getBean(ApplicationArguments.class).getNonOptionArgs().stream()
+ .anyMatch("SystemUpdate"::equals);
+ }
+}
diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java
index 177d4b531ba86..cde3a29248fb5 100644
--- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java
@@ -8,6 +8,7 @@
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory;
+import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
@@ -21,9 +22,12 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
@Slf4j
@Configuration
@@ -74,4 +78,23 @@ protected KafkaEventProducer duheKafkaEventProducer(
duheSchemaRegistryConfig, kafkaConfiguration, properties));
return new KafkaEventProducer(producer, topicConvention, kafkaHealthChecker);
}
+
+ /**
+ * The ReindexDataJobViaNodesCLLConfig step requires publishing to MCL. Overriding the default
+ * producer with this special producer which doesn't require an active registry.
+ *
+ *
Use when INTERNAL registry and is SYSTEM_UPDATE
+ *
+ *
This forces this producer into the EntityService
+ */
+ @Primary
+ @Bean(name = "kafkaEventProducer")
+ @Conditional(SystemUpdateCondition.class)
+ @ConditionalOnProperty(
+ name = "kafka.schemaRegistry.type",
+ havingValue = InternalSchemaRegistryFactory.TYPE)
+ protected KafkaEventProducer kafkaEventProducer(
+ @Qualifier("duheKafkaEventProducer") KafkaEventProducer kafkaEventProducer) {
+ return kafkaEventProducer;
+ }
}
diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2.java
index 4b9fc5bba0204..9b023e1e239a2 100644
--- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2.java
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2.java
@@ -11,8 +11,20 @@ public class BackfillBrowsePathsV2 implements Upgrade {
private final List _steps;
- public BackfillBrowsePathsV2(EntityService> entityService, SearchService searchService) {
- _steps = ImmutableList.of(new BackfillBrowsePathsV2Step(entityService, searchService));
+ public BackfillBrowsePathsV2(
+ EntityService> entityService,
+ SearchService searchService,
+ boolean enabled,
+ boolean reprocessEnabled,
+ Integer batchSize) {
+ if (enabled) {
+ _steps =
+ ImmutableList.of(
+ new BackfillBrowsePathsV2Step(
+ entityService, searchService, reprocessEnabled, batchSize));
+ } else {
+ _steps = ImmutableList.of();
+ }
}
@Override
diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java
index 601ce4d25493c..2d64e0052ae82 100644
--- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java
@@ -16,6 +16,7 @@
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.utils.DefaultAspectsUtil;
+import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Condition;
@@ -37,9 +38,8 @@
@Slf4j
public class BackfillBrowsePathsV2Step implements UpgradeStep {
- public static final String BACKFILL_BROWSE_PATHS_V2 = "BACKFILL_BROWSE_PATHS_V2";
- public static final String REPROCESS_DEFAULT_BROWSE_PATHS_V2 =
- "REPROCESS_DEFAULT_BROWSE_PATHS_V2";
+ private static final String UPGRADE_ID = "BackfillBrowsePathsV2Step";
+ private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);
public static final String DEFAULT_BROWSE_PATH_V2 = "␟Default";
private static final Set ENTITY_TYPES_TO_MIGRATE =
@@ -53,14 +53,22 @@ public class BackfillBrowsePathsV2Step implements UpgradeStep {
Constants.ML_MODEL_GROUP_ENTITY_NAME,
Constants.ML_FEATURE_TABLE_ENTITY_NAME,
Constants.ML_FEATURE_ENTITY_NAME);
- private static final Integer BATCH_SIZE = 5000;
- private final EntityService> _entityService;
- private final SearchService _searchService;
-
- public BackfillBrowsePathsV2Step(EntityService> entityService, SearchService searchService) {
- _searchService = searchService;
- _entityService = entityService;
+ private final EntityService> entityService;
+ private final SearchService searchService;
+
+ private final boolean reprocessEnabled;
+ private final Integer batchSize;
+
+ public BackfillBrowsePathsV2Step(
+ EntityService> entityService,
+ SearchService searchService,
+ boolean reprocessEnabled,
+ Integer batchSize) {
+ this.searchService = searchService;
+ this.entityService = entityService;
+ this.reprocessEnabled = reprocessEnabled;
+ this.batchSize = batchSize;
}
@Override
@@ -78,11 +86,14 @@ public Function executable() {
log.info(
String.format(
"Upgrading batch %s-%s of browse paths for entity type %s",
- migratedCount, migratedCount + BATCH_SIZE, entityType));
+ migratedCount, migratedCount + batchSize, entityType));
scrollId = backfillBrowsePathsV2(entityType, auditStamp, scrollId);
- migratedCount += BATCH_SIZE;
+ migratedCount += batchSize;
} while (scrollId != null);
}
+
+ BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, entityService);
+
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}
@@ -91,27 +102,27 @@ private String backfillBrowsePathsV2(String entityType, AuditStamp auditStamp, S
final Filter filter;
- if (System.getenv().containsKey(REPROCESS_DEFAULT_BROWSE_PATHS_V2)
- && Boolean.parseBoolean(System.getenv(REPROCESS_DEFAULT_BROWSE_PATHS_V2))) {
+ if (reprocessEnabled) {
filter = backfillDefaultBrowsePathsV2Filter();
} else {
filter = backfillBrowsePathsV2Filter();
}
final ScrollResult scrollResult =
- _searchService.scrollAcrossEntities(
+ searchService.scrollAcrossEntities(
ImmutableList.of(entityType),
"*",
filter,
null,
scrollId,
null,
- BATCH_SIZE,
+ batchSize,
new SearchFlags()
.setFulltext(true)
.setSkipCache(true)
.setSkipHighlighting(true)
.setSkipAggregates(true));
+
if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().size() == 0) {
return null;
}
@@ -183,7 +194,7 @@ private Filter backfillDefaultBrowsePathsV2Filter() {
private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exception {
BrowsePathsV2 browsePathsV2 =
- DefaultAspectsUtil.buildDefaultBrowsePathV2(urn, true, _entityService);
+ DefaultAspectsUtil.buildDefaultBrowsePathV2(urn, true, entityService);
log.debug(String.format("Adding browse path v2 for urn %s with value %s", urn, browsePathsV2));
MetadataChangeProposal proposal = new MetadataChangeProposal();
proposal.setEntityUrn(urn);
@@ -193,12 +204,12 @@ private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exceptio
proposal.setSystemMetadata(
new SystemMetadata().setRunId(DEFAULT_RUN_ID).setLastObserved(System.currentTimeMillis()));
proposal.setAspect(GenericRecordUtils.serializeAspect(browsePathsV2));
- _entityService.ingestProposal(proposal, auditStamp, true);
+ entityService.ingestProposal(proposal, auditStamp, true);
}
@Override
public String id() {
- return "BackfillBrowsePathsV2Step";
+ return UPGRADE_ID;
}
/**
@@ -211,7 +222,22 @@ public boolean isOptional() {
}
@Override
+ /**
+ * Returns whether the upgrade should be skipped. Uses previous run history or the environment
+ * variables REPROCESS_DEFAULT_BROWSE_PATHS_V2 & BACKFILL_BROWSE_PATHS_V2 to determine whether to
+ * skip.
+ */
public boolean skip(UpgradeContext context) {
- return !Boolean.parseBoolean(System.getenv(BACKFILL_BROWSE_PATHS_V2));
+ boolean envEnabled = Boolean.parseBoolean(System.getenv("BACKFILL_BROWSE_PATHS_V2"));
+
+ if (reprocessEnabled && envEnabled) {
+ return false;
+ }
+
+ boolean previouslyRun = entityService.exists(UPGRADE_ID_URN, true);
+ if (previouslyRun) {
+ log.info("{} was already run. Skipping.", id());
+ }
+ return (previouslyRun || !envEnabled);
}
}
diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLL.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLL.java
index 41179a50c4b54..59975693322d1 100644
--- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLL.java
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLL.java
@@ -18,8 +18,13 @@ public class ReindexDataJobViaNodesCLL implements Upgrade {
private final List _steps;
- public ReindexDataJobViaNodesCLL(EntityService> entityService) {
- _steps = ImmutableList.of(new ReindexDataJobViaNodesCLLStep(entityService));
+ public ReindexDataJobViaNodesCLL(
+ EntityService> entityService, boolean enabled, Integer batchSize) {
+ if (enabled) {
+ _steps = ImmutableList.of(new ReindexDataJobViaNodesCLLStep(entityService, batchSize));
+ } else {
+ _steps = ImmutableList.of();
+ }
}
@Override
diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLLStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLLStep.java
index 70afbc3d205b2..56166caf5b57e 100644
--- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLLStep.java
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLLStep.java
@@ -11,7 +11,6 @@
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult;
-import java.net.URISyntaxException;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
@@ -21,12 +20,12 @@ public class ReindexDataJobViaNodesCLLStep implements UpgradeStep {
private static final String UPGRADE_ID = "via-node-cll-reindex-datajob";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);
- private static final Integer BATCH_SIZE = 5000;
+ private final EntityService> entityService;
+ private final Integer batchSize;
- private final EntityService _entityService;
-
- public ReindexDataJobViaNodesCLLStep(EntityService entityService) {
- _entityService = entityService;
+ public ReindexDataJobViaNodesCLLStep(EntityService> entityService, Integer batchSize) {
+ this.entityService = entityService;
+ this.batchSize = batchSize;
}
@Override
@@ -35,17 +34,16 @@ public Function executable() {
RestoreIndicesArgs args =
new RestoreIndicesArgs()
.setAspectName(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)
- .setUrnLike("urn:li:" + DATA_JOB_ENTITY_NAME + ":%");
+ .setUrnLike("urn:li:" + DATA_JOB_ENTITY_NAME + ":%")
+ .setBatchSize(batchSize);
RestoreIndicesResult result =
- _entityService.restoreIndices(args, x -> context.report().addLine((String) x));
+ entityService.restoreIndices(args, x -> context.report().addLine((String) x));
context.report().addLine("Rows migrated: " + result.rowsMigrated);
context.report().addLine("Rows ignored: " + result.ignored);
- try {
- BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, _entityService);
- context.report().addLine("State updated: " + UPGRADE_ID_URN);
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
+
+ BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, entityService);
+ context.report().addLine("State updated: " + UPGRADE_ID_URN);
+
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}
@@ -70,7 +68,7 @@ public boolean isOptional() {
* variable SKIP_REINDEX_DATA_JOB_INPUT_OUTPUT to determine whether to skip.
*/
public boolean skip(UpgradeContext context) {
- boolean previouslyRun = _entityService.exists(UPGRADE_ID_URN, true);
+ boolean previouslyRun = entityService.exists(UPGRADE_ID_URN, true);
boolean envFlagRecommendsSkip =
Boolean.parseBoolean(System.getenv("SKIP_REINDEX_DATA_JOB_INPUT_OUTPUT"));
if (previouslyRun) {
diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java
index 83b8e028727ce..4c9e12c0ed151 100644
--- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java
+++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java
@@ -4,6 +4,8 @@
import static org.testng.AssertJUnit.assertNotNull;
import com.linkedin.datahub.upgrade.system.SystemUpdate;
+import com.linkedin.metadata.dao.producer.KafkaEventProducer;
+import com.linkedin.metadata.entity.EntityServiceImpl;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -19,19 +21,37 @@
classes = {UpgradeCliApplication.class, UpgradeCliApplicationTestConfiguration.class},
properties = {
"kafka.schemaRegistry.type=INTERNAL",
- "DATAHUB_UPGRADE_HISTORY_TOPIC_NAME=test_due_topic"
- })
+ "DATAHUB_UPGRADE_HISTORY_TOPIC_NAME=test_due_topic",
+ "METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME=test_mcl_versioned_topic"
+ },
+ args = {"-u", "SystemUpdate"})
public class DatahubUpgradeNoSchemaRegistryTest extends AbstractTestNGSpringContextTests {
@Autowired
@Named("systemUpdate")
private SystemUpdate systemUpdate;
+ @Autowired
+ @Named("kafkaEventProducer")
+ private KafkaEventProducer kafkaEventProducer;
+
+ @Autowired
+ @Named("duheKafkaEventProducer")
+ private KafkaEventProducer duheKafkaEventProducer;
+
+ @Autowired private EntityServiceImpl entityService;
+
@Test
public void testSystemUpdateInit() {
assertNotNull(systemUpdate);
}
+ @Test
+ public void testSystemUpdateKafkaProducerOverride() {
+ assertEquals(kafkaEventProducer, duheKafkaEventProducer);
+ assertEquals(entityService.get_producer(), duheKafkaEventProducer);
+ }
+
@Test
public void testSystemUpdateSend() {
UpgradeStepResult.Result result =
diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java
index be28b7f739cf5..5c2d6fff0f07c 100644
--- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java
+++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java
@@ -1,15 +1,21 @@
package com.linkedin.datahub.upgrade;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
-import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
+import com.linkedin.metadata.registry.SchemaRegistryService;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import io.ebean.Database;
+import java.util.Optional;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
@TestConfiguration
@@ -20,8 +26,6 @@ public class UpgradeCliApplicationTestConfiguration {
@MockBean private Database ebeanServer;
- @MockBean private EntityService> _entityService;
-
@MockBean private SearchService searchService;
@MockBean private GraphService graphService;
@@ -31,4 +35,11 @@ public class UpgradeCliApplicationTestConfiguration {
@MockBean ConfigEntityRegistry configEntityRegistry;
@MockBean public EntityIndexBuilders entityIndexBuilders;
+
+ @Bean
+ public SchemaRegistryService schemaRegistryService() {
+ SchemaRegistryService mockService = mock(SchemaRegistryService.class);
+ when(mockService.getSchemaIdForTopic(anyString())).thenReturn(Optional.of(0));
+ return mockService;
+ }
}
diff --git a/metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java b/metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java
index 645c2fe210e09..adff32d5d336d 100644
--- a/metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java
+++ b/metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java
@@ -57,7 +57,7 @@ public class EventUtils {
private static final Schema ORIGINAL_MCP_AVRO_SCHEMA =
getAvroSchemaFromResource("avro/com/linkedin/mxe/MetadataChangeProposal.avsc");
- private static final Schema ORIGINAL_MCL_AVRO_SCHEMA =
+ public static final Schema ORIGINAL_MCL_AVRO_SCHEMA =
getAvroSchemaFromResource("avro/com/linkedin/mxe/MetadataChangeLog.avsc");
private static final Schema ORIGINAL_FMCL_AVRO_SCHEMA =
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java
index 7f15e3a7fd8fc..eec5c6120886d 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java
@@ -15,6 +15,7 @@
import com.codahale.metrics.Timer;
import com.datahub.util.RecordUtils;
import com.datahub.util.exception.ModelConversionException;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
@@ -146,7 +147,8 @@ public class EntityServiceImpl implements EntityService {
private static final int DEFAULT_MAX_TRANSACTION_RETRY = 3;
protected final AspectDao _aspectDao;
- private final EventProducer _producer;
+
+ @VisibleForTesting @Getter private final EventProducer _producer;
private final EntityRegistry _entityRegistry;
private final Map> _entityToValidAspects;
private RetentionService _retentionService;
@@ -637,10 +639,15 @@ public List ingestAspects(
@Override
public List ingestAspects(
@Nonnull final AspectsBatch aspectsBatch, boolean emitMCL, boolean overwrite) {
+ Set items = new HashSet<>(aspectsBatch.getItems());
+
+ // Generate additional items as needed
+ items.addAll(DefaultAspectsUtil.getAdditionalChanges(aspectsBatch, this, enableBrowseV2));
+ AspectsBatch withDefaults = AspectsBatchImpl.builder().items(items).build();
Timer.Context ingestToLocalDBTimer =
MetricUtils.timer(this.getClass(), "ingestAspectsToLocalDB").time();
- List ingestResults = ingestAspectsToLocalDB(aspectsBatch, overwrite);
+ List ingestResults = ingestAspectsToLocalDB(withDefaults, overwrite);
List mclResults = emitMCL(ingestResults, emitMCL);
ingestToLocalDBTimer.stop();
@@ -964,7 +971,7 @@ public IngestResult ingestProposal(
*/
@Override
public Set ingestProposal(AspectsBatch aspectsBatch, final boolean async) {
- Stream timeseriesIngestResults = ingestTimeseriesProposal(aspectsBatch);
+ Stream timeseriesIngestResults = ingestTimeseriesProposal(aspectsBatch, async);
Stream nonTimeseriesIngestResults =
async ? ingestProposalAsync(aspectsBatch) : ingestProposalSync(aspectsBatch);
@@ -978,7 +985,8 @@ public Set ingestProposal(AspectsBatch aspectsBatch, final boolean
* @param aspectsBatch timeseries upserts batch
* @return returns ingest proposal result, however was never in the MCP topic
*/
- private Stream ingestTimeseriesProposal(AspectsBatch aspectsBatch) {
+ private Stream ingestTimeseriesProposal(
+ AspectsBatch aspectsBatch, final boolean async) {
List extends BatchItem> unsupported =
aspectsBatch.getItems().stream()
.filter(
@@ -992,6 +1000,20 @@ private Stream ingestTimeseriesProposal(AspectsBatch aspectsBatch)
+ unsupported.stream().map(BatchItem::getChangeType).collect(Collectors.toSet()));
}
+ if (!async) {
+ // Create default non-timeseries aspects for timeseries aspects
+ List timeseriesItems =
+ aspectsBatch.getItems().stream()
+ .filter(item -> item.getAspectSpec().isTimeseries())
+ .collect(Collectors.toList());
+
+ List defaultAspects =
+ DefaultAspectsUtil.getAdditionalChanges(
+ AspectsBatchImpl.builder().items(timeseriesItems).build(), this, enableBrowseV2);
+ ingestProposalSync(AspectsBatchImpl.builder().items(defaultAspects).build());
+ }
+
+ // Emit timeseries MCLs
List, Boolean>>>> timeseriesResults =
aspectsBatch.getItems().stream()
.filter(item -> item.getAspectSpec().isTimeseries())
@@ -1080,17 +1102,10 @@ private Stream ingestProposalAsync(AspectsBatch aspectsBatch) {
}
private Stream ingestProposalSync(AspectsBatch aspectsBatch) {
- Set items = new HashSet<>(aspectsBatch.getItems());
-
- // Generate additional items as needed
- items.addAll(DefaultAspectsUtil.getAdditionalChanges(aspectsBatch, this, enableBrowseV2));
-
- AspectsBatch withDefaults = AspectsBatchImpl.builder().items(items).build();
-
AspectsBatchImpl nonTimeseries =
AspectsBatchImpl.builder()
.items(
- withDefaults.getItems().stream()
+ aspectsBatch.getItems().stream()
.filter(item -> !item.getAspectSpec().isTimeseries())
.collect(Collectors.toList()))
.build();
diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java
index ea4e97d264bca..384b54c7a1c8d 100644
--- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java
+++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java
@@ -479,7 +479,7 @@ public void testIngestAspectsGetLatestAspects() throws Exception {
assertTrue(DataTemplateUtil.areEqual(writeAspect1, latestAspects.get(aspectName1)));
assertTrue(DataTemplateUtil.areEqual(writeAspect2, latestAspects.get(aspectName2)));
- verify(_mockProducer, times(2))
+ verify(_mockProducer, times(3))
.produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), Mockito.any());
verifyNoMoreInteractions(_mockProducer);
@@ -772,6 +772,12 @@ public void testUpdateGetAspect() throws AssertionError {
.produceMetadataChangeLog(
Mockito.eq(entityUrn), Mockito.eq(corpUserInfoSpec), Mockito.any());
+ verify(_mockProducer, times(1))
+ .produceMetadataChangeLog(
+ Mockito.eq(entityUrn),
+ Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserKey")),
+ Mockito.any());
+
verifyNoMoreInteractions(_mockProducer);
}
@@ -824,6 +830,13 @@ public void testGetAspectAtVersion() throws AssertionError {
readAspect1 = _entityServiceImpl.getVersionedAspect(entityUrn, aspectName, -1);
assertFalse(DataTemplateUtil.areEqual(writtenVersionedAspect1, readAspect1));
+ // check key aspect
+ verify(_mockProducer, times(1))
+ .produceMetadataChangeLog(
+ Mockito.eq(entityUrn),
+ Mockito.eq(_testEntityRegistry.getEntitySpec("corpuser").getAspectSpec("corpUserKey")),
+ Mockito.any());
+
verifyNoMoreInteractions(_mockProducer);
}
@@ -1094,13 +1107,22 @@ public void testIngestGetLatestAspect() throws AssertionError {
ArgumentCaptor mclCaptor = ArgumentCaptor.forClass(MetadataChangeLog.class);
verify(_mockProducer, times(1))
- .produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), mclCaptor.capture());
+ .produceMetadataChangeLog(
+ Mockito.eq(entityUrn),
+ Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserInfo")),
+ mclCaptor.capture());
MetadataChangeLog mcl = mclCaptor.getValue();
assertEquals(mcl.getEntityType(), "corpuser");
assertNull(mcl.getPreviousAspectValue());
assertNull(mcl.getPreviousSystemMetadata());
assertEquals(mcl.getChangeType(), ChangeType.UPSERT);
+ verify(_mockProducer, times(1))
+ .produceMetadataChangeLog(
+ Mockito.eq(entityUrn),
+ Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserKey")),
+ Mockito.any());
+
verifyNoMoreInteractions(_mockProducer);
reset(_mockProducer);
@@ -1201,7 +1223,16 @@ public void testIngestGetLatestEnvelopedAspect() throws Exception {
EntityUtils.parseSystemMetadata(readAspectDao1.getSystemMetadata()), metadata1));
verify(_mockProducer, times(2))
- .produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), Mockito.any());
+ .produceMetadataChangeLog(
+ Mockito.eq(entityUrn),
+ Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserInfo")),
+ Mockito.any());
+
+ verify(_mockProducer, times(1))
+ .produceMetadataChangeLog(
+ Mockito.eq(entityUrn),
+ Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserKey")),
+ Mockito.any());
verifyNoMoreInteractions(_mockProducer);
}
@@ -1234,9 +1265,18 @@ public void testIngestSameAspect() throws AssertionError {
RecordTemplate readAspect1 = _entityServiceImpl.getLatestAspect(entityUrn, aspectName);
assertTrue(DataTemplateUtil.areEqual(writeAspect1, readAspect1));
+ verify(_mockProducer, times(1))
+ .produceMetadataChangeLog(
+ Mockito.eq(entityUrn),
+ Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserKey")),
+ Mockito.any());
+
ArgumentCaptor mclCaptor = ArgumentCaptor.forClass(MetadataChangeLog.class);
verify(_mockProducer, times(1))
- .produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), mclCaptor.capture());
+ .produceMetadataChangeLog(
+ Mockito.eq(entityUrn),
+ Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserInfo")),
+ mclCaptor.capture());
MetadataChangeLog mcl = mclCaptor.getValue();
assertEquals(mcl.getEntityType(), "corpuser");
assertNull(mcl.getPreviousAspectValue());
diff --git a/metadata-service/configuration/src/main/resources/application.yml b/metadata-service/configuration/src/main/resources/application.yml
index d4c11d4aa53bd..c2a0d508b57d6 100644
--- a/metadata-service/configuration/src/main/resources/application.yml
+++ b/metadata-service/configuration/src/main/resources/application.yml
@@ -314,6 +314,14 @@ systemUpdate:
maxBackOffs: ${BOOTSTRAP_SYSTEM_UPDATE_MAX_BACK_OFFS:50}
backOffFactor: ${BOOTSTRAP_SYSTEM_UPDATE_BACK_OFF_FACTOR:2} # Multiplicative factor for back off, default values will result in waiting 5min 15s
waitForSystemUpdate: ${BOOTSTRAP_SYSTEM_UPDATE_WAIT_FOR_SYSTEM_UPDATE:true}
+ dataJobNodeCLL:
+ enabled: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_ENABLED:true}
+ batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_BATCH_SIZE:200}
+ browsePathsV2:
+ enabled: ${BOOTSTRAP_SYSTEM_UPDATE_BROWSE_PATHS_V2_ENABLED:true}
+ batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_BROWSE_PATHS_V2_BATCH_SIZE:5000}
+ reprocess:
+ enabled: ${REPROCESS_DEFAULT_BROWSE_PATHS_V2:false}
structuredProperties:
enabled: ${ENABLE_STRUCTURED_PROPERTIES_HOOK:true} # applies structured properties mappings
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java
index 871f16d97be33..2ccdee5fb1dbf 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java
@@ -1,20 +1,15 @@
package com.linkedin.gms.factory.entity;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
-import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
-import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.EntityServiceImpl;
import com.linkedin.metadata.entity.ebean.batch.MCPUpsertBatchItem;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.service.UpdateIndicesService;
-import com.linkedin.mxe.TopicConvention;
import javax.annotation.Nonnull;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.kafka.clients.producer.Producer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
@@ -28,26 +23,16 @@ public class EntityServiceFactory {
private Integer _ebeanMaxTransactionRetry;
@Bean(name = "entityService")
- @DependsOn({
- "entityAspectDao",
- "kafkaEventProducer",
- "kafkaHealthChecker",
- TopicConventionFactory.TOPIC_CONVENTION_BEAN,
- "entityRegistry"
- })
+ @DependsOn({"entityAspectDao", "kafkaEventProducer", "entityRegistry"})
@Nonnull
protected EntityService createInstance(
- Producer producer,
- TopicConvention convention,
- KafkaHealthChecker kafkaHealthChecker,
+ @Qualifier("kafkaEventProducer") final KafkaEventProducer eventProducer,
@Qualifier("entityAspectDao") AspectDao aspectDao,
EntityRegistry entityRegistry,
ConfigurationProvider configurationProvider,
UpdateIndicesService updateIndicesService,
@Value("${featureFlags.showBrowseV2}") final boolean enableBrowsePathV2) {
- final KafkaEventProducer eventProducer =
- new KafkaEventProducer(producer, convention, kafkaHealthChecker);
FeatureFlags featureFlags = configurationProvider.getFeatureFlags();
return new EntityServiceImpl(
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/DUHESchemaRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/DUHESchemaRegistryFactory.java
deleted file mode 100644
index 4819984307af9..0000000000000
--- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/DUHESchemaRegistryFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package com.linkedin.gms.factory.kafka.schemaregistry;
-
-import static com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener.TOPIC_NAME;
-
-import com.linkedin.gms.factory.config.ConfigurationProvider;
-import com.linkedin.metadata.boot.kafka.MockDUHEDeserializer;
-import com.linkedin.metadata.boot.kafka.MockDUHESerializer;
-import com.linkedin.metadata.config.kafka.KafkaConfiguration;
-import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
-import java.util.HashMap;
-import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Slf4j
-@Configuration
-public class DUHESchemaRegistryFactory {
-
- public static final String DUHE_SCHEMA_REGISTRY_TOPIC_KEY = "duheTopicName";
-
- @Value(TOPIC_NAME)
- private String duheTopicName;
-
- /** Configure Kafka Producer/Consumer processes with a custom schema registry. */
- @Bean("duheSchemaRegistryConfig")
- protected SchemaRegistryConfig duheSchemaRegistryConfig(ConfigurationProvider provider) {
- Map props = new HashMap<>();
- KafkaConfiguration kafkaConfiguration = provider.getKafka();
-
- props.put(
- AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
- kafkaConfiguration.getSchemaRegistry().getUrl());
- props.put(DUHE_SCHEMA_REGISTRY_TOPIC_KEY, duheTopicName);
-
- log.info("DataHub System Update Registry");
- return new SchemaRegistryConfig(MockDUHESerializer.class, MockDUHEDeserializer.class, props);
- }
-}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/InternalSchemaRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/InternalSchemaRegistryFactory.java
index 8c814e5054758..46b27195ecc67 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/InternalSchemaRegistryFactory.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/InternalSchemaRegistryFactory.java
@@ -1,11 +1,7 @@
package com.linkedin.gms.factory.kafka.schemaregistry;
-import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
-import com.linkedin.metadata.registry.SchemaRegistryService;
-import com.linkedin.metadata.registry.SchemaRegistryServiceImpl;
-import com.linkedin.mxe.TopicConvention;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
@@ -17,7 +13,6 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.DependsOn;
@Slf4j
@Configuration
@@ -45,11 +40,4 @@ protected SchemaRegistryConfig getInstance(
kafkaConfiguration.getSchemaRegistry().getUrl());
return new SchemaRegistryConfig(KafkaAvroSerializer.class, KafkaAvroDeserializer.class, props);
}
-
- @Bean(name = "schemaRegistryService")
- @Nonnull
- @DependsOn({TopicConventionFactory.TOPIC_CONVENTION_BEAN})
- protected SchemaRegistryService schemaRegistryService(TopicConvention convention) {
- return new SchemaRegistryServiceImpl(convention);
- }
}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SchemaRegistryServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SchemaRegistryServiceFactory.java
new file mode 100644
index 0000000000000..a6869321d796f
--- /dev/null
+++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SchemaRegistryServiceFactory.java
@@ -0,0 +1,20 @@
+package com.linkedin.gms.factory.kafka.schemaregistry;
+
+import com.linkedin.gms.factory.common.TopicConventionFactory;
+import com.linkedin.metadata.registry.SchemaRegistryService;
+import com.linkedin.metadata.registry.SchemaRegistryServiceImpl;
+import com.linkedin.mxe.TopicConvention;
+import javax.annotation.Nonnull;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.DependsOn;
+
+@Configuration
+public class SchemaRegistryServiceFactory {
+ @Bean(name = "schemaRegistryService")
+ @Nonnull
+ @DependsOn({TopicConventionFactory.TOPIC_CONVENTION_BEAN})
+ protected SchemaRegistryService schemaRegistryService(TopicConvention convention) {
+ return new SchemaRegistryServiceImpl(convention);
+ }
+}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SystemUpdateSchemaRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SystemUpdateSchemaRegistryFactory.java
new file mode 100644
index 0000000000000..d02cdc0e68f52
--- /dev/null
+++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SystemUpdateSchemaRegistryFactory.java
@@ -0,0 +1,66 @@
+package com.linkedin.gms.factory.kafka.schemaregistry;
+
+import static com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener.TOPIC_NAME;
+
+import com.linkedin.gms.factory.config.ConfigurationProvider;
+import com.linkedin.metadata.boot.kafka.MockSystemUpdateDeserializer;
+import com.linkedin.metadata.boot.kafka.MockSystemUpdateSerializer;
+import com.linkedin.metadata.config.kafka.KafkaConfiguration;
+import com.linkedin.metadata.registry.SchemaRegistryService;
+import com.linkedin.mxe.Topics;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Slf4j
+@Configuration
+public class SystemUpdateSchemaRegistryFactory {
+
+ public static final String SYSTEM_UPDATE_TOPIC_KEY_PREFIX = "data-hub.system-update.topic-key.";
+ public static final String SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX = ".id";
+
+ public static final String DUHE_SCHEMA_REGISTRY_TOPIC_KEY =
+ SYSTEM_UPDATE_TOPIC_KEY_PREFIX + "duhe";
+ public static final String MCL_VERSIONED_SCHEMA_REGISTRY_TOPIC_KEY =
+ SYSTEM_UPDATE_TOPIC_KEY_PREFIX + "mcl-versioned";
+
+ @Value(TOPIC_NAME)
+ private String duheTopicName;
+
+ @Value("${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_VERSIONED + "}")
+ private String mclTopicName;
+
+ /** Configure Kafka Producer/Consumer processes with a custom schema registry. */
+ @Bean("duheSchemaRegistryConfig")
+ protected SchemaRegistryConfig duheSchemaRegistryConfig(
+ final ConfigurationProvider provider, final SchemaRegistryService schemaRegistryService) {
+ Map props = new HashMap<>();
+ KafkaConfiguration kafkaConfiguration = provider.getKafka();
+
+ props.put(
+ AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
+ kafkaConfiguration.getSchemaRegistry().getUrl());
+
+ // topic names
+ props.putAll(
+ Map.of(
+ DUHE_SCHEMA_REGISTRY_TOPIC_KEY, duheTopicName,
+ MCL_VERSIONED_SCHEMA_REGISTRY_TOPIC_KEY, mclTopicName));
+
+ // topic ordinals
+ props.putAll(
+ Map.of(
+ DUHE_SCHEMA_REGISTRY_TOPIC_KEY + SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX,
+ schemaRegistryService.getSchemaIdForTopic(duheTopicName).get().toString(),
+ MCL_VERSIONED_SCHEMA_REGISTRY_TOPIC_KEY + SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX,
+ schemaRegistryService.getSchemaIdForTopic(mclTopicName).get().toString()));
+
+ log.info("DataHub System Update Registry");
+ return new SchemaRegistryConfig(
+ MockSystemUpdateSerializer.class, MockSystemUpdateDeserializer.class, props);
+ }
+}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/BootstrapStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/BootstrapStep.java
index a79bdacfc55e9..2dccda4243bca 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/BootstrapStep.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/BootstrapStep.java
@@ -1,16 +1,15 @@
package com.linkedin.metadata.boot;
-import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.key.DataHubUpgradeKey;
+import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.upgrade.DataHubUpgradeResult;
-import java.net.URISyntaxException;
import javax.annotation.Nonnull;
/** A single step in the Bootstrap process. */
@@ -40,24 +39,10 @@ static Urn getUpgradeUrn(String upgradeId) {
new DataHubUpgradeKey().setId(upgradeId), Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
}
- static void setUpgradeResult(Urn urn, EntityService> entityService) throws URISyntaxException {
- final AuditStamp auditStamp =
- new AuditStamp()
- .setActor(Urn.createFromString(Constants.SYSTEM_ACTOR))
- .setTime(System.currentTimeMillis());
+ static void setUpgradeResult(Urn urn, EntityService> entityService) {
final DataHubUpgradeResult upgradeResult =
new DataHubUpgradeResult().setTimestampMs(System.currentTimeMillis());
- // Workaround because entity service does not auto-generate the key aspect for us
- final MetadataChangeProposal keyProposal = new MetadataChangeProposal();
- final DataHubUpgradeKey upgradeKey = new DataHubUpgradeKey().setId(urn.getId());
- keyProposal.setEntityUrn(urn);
- keyProposal.setEntityType(Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
- keyProposal.setAspectName(Constants.DATA_HUB_UPGRADE_KEY_ASPECT_NAME);
- keyProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeKey));
- keyProposal.setChangeType(ChangeType.UPSERT);
- entityService.ingestProposal(keyProposal, auditStamp, false);
-
// Ingest the upgrade result
final MetadataChangeProposal upgradeProposal = new MetadataChangeProposal();
upgradeProposal.setEntityUrn(urn);
@@ -65,6 +50,6 @@ static void setUpgradeResult(Urn urn, EntityService> entityService) throws URI
upgradeProposal.setAspectName(Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME);
upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeResult));
upgradeProposal.setChangeType(ChangeType.UPSERT);
- entityService.ingestProposal(upgradeProposal, auditStamp, false);
+ entityService.ingestProposal(upgradeProposal, AuditStampUtils.createDefaultAuditStamp(), false);
}
}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHESerializer.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHESerializer.java
deleted file mode 100644
index 36fe514d5536f..0000000000000
--- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHESerializer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.linkedin.metadata.boot.kafka;
-
-import static com.linkedin.gms.factory.kafka.schemaregistry.DUHESchemaRegistryFactory.DUHE_SCHEMA_REGISTRY_TOPIC_KEY;
-
-import com.linkedin.metadata.EventUtils;
-import io.confluent.kafka.schemaregistry.avro.AvroSchema;
-import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
-import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
-import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
-import io.confluent.kafka.serializers.KafkaAvroSerializer;
-import java.io.IOException;
-import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
-
-/** Used for early bootstrap to avoid contact with not yet existing schema registry */
-@Slf4j
-public class MockDUHESerializer extends KafkaAvroSerializer {
-
- private static final String DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT_SUFFIX = "-value";
-
- private String topicName;
-
- public MockDUHESerializer() {
- this.schemaRegistry = buildMockSchemaRegistryClient();
- }
-
- public MockDUHESerializer(SchemaRegistryClient client) {
- super(client);
- this.schemaRegistry = buildMockSchemaRegistryClient();
- }
-
- public MockDUHESerializer(SchemaRegistryClient client, Map props) {
- super(client, props);
- this.schemaRegistry = buildMockSchemaRegistryClient();
- }
-
- @Override
- public void configure(Map configs, boolean isKey) {
- super.configure(configs, isKey);
- topicName = configs.get(DUHE_SCHEMA_REGISTRY_TOPIC_KEY).toString();
- }
-
- private MockSchemaRegistryClient buildMockSchemaRegistryClient() {
- MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient();
- try {
- schemaRegistry.register(
- topicToSubjectName(topicName), new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA));
- return schemaRegistry;
- } catch (IOException | RestClientException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static String topicToSubjectName(String topicName) {
- return topicName + DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT_SUFFIX;
- }
-}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHEDeserializer.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateDeserializer.java
similarity index 57%
rename from metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHEDeserializer.java
rename to metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateDeserializer.java
index e631f776abd08..74a20cdacbb21 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHEDeserializer.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateDeserializer.java
@@ -1,50 +1,49 @@
package com.linkedin.metadata.boot.kafka;
-import static com.linkedin.gms.factory.kafka.schemaregistry.DUHESchemaRegistryFactory.DUHE_SCHEMA_REGISTRY_TOPIC_KEY;
-import static com.linkedin.metadata.boot.kafka.MockDUHESerializer.topicToSubjectName;
+import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.DUHE_SCHEMA_REGISTRY_TOPIC_KEY;
+import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX;
+import static com.linkedin.metadata.boot.kafka.MockSystemUpdateSerializer.topicToSubjectName;
import com.linkedin.metadata.EventUtils;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
-import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.io.IOException;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
-/** Used for early bootstrap to avoid contact with not yet existing schema registry */
+/**
+ * Used for early bootstrap to avoid contact with not yet existing schema registry Only supports the
+ * DUHE topic
+ */
@Slf4j
-public class MockDUHEDeserializer extends KafkaAvroDeserializer {
+public class MockSystemUpdateDeserializer extends KafkaAvroDeserializer {
private String topicName;
-
- public MockDUHEDeserializer() {
- this.schemaRegistry = buildMockSchemaRegistryClient();
- }
-
- public MockDUHEDeserializer(SchemaRegistryClient client) {
- super(client);
- this.schemaRegistry = buildMockSchemaRegistryClient();
- }
-
- public MockDUHEDeserializer(SchemaRegistryClient client, Map props) {
- super(client, props);
- this.schemaRegistry = buildMockSchemaRegistryClient();
- }
+ private Integer schemaId;
@Override
public void configure(Map configs, boolean isKey) {
super.configure(configs, isKey);
topicName = configs.get(DUHE_SCHEMA_REGISTRY_TOPIC_KEY).toString();
+ schemaId =
+ Integer.valueOf(
+ configs
+ .get(DUHE_SCHEMA_REGISTRY_TOPIC_KEY + SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX)
+ .toString());
+ this.schemaRegistry = buildMockSchemaRegistryClient();
}
private MockSchemaRegistryClient buildMockSchemaRegistryClient() {
- MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient2();
+ MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient2(schemaId);
try {
schemaRegistry.register(
- topicToSubjectName(topicName), new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA));
+ topicToSubjectName(topicName),
+ new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA),
+ 0,
+ schemaId);
return schemaRegistry;
} catch (IOException | RestClientException e) {
throw new RuntimeException(e);
@@ -52,13 +51,19 @@ private MockSchemaRegistryClient buildMockSchemaRegistryClient() {
}
public static class MockSchemaRegistryClient2 extends MockSchemaRegistryClient {
+ private final int schemaId;
+
+ public MockSchemaRegistryClient2(int schemaId) {
+ this.schemaId = schemaId;
+ }
+
/**
* Previously used topics can have schema ids > 1 which fully match however we are replacing
* that registry so force schema id to 1
*/
@Override
public synchronized ParsedSchema getSchemaById(int id) throws IOException, RestClientException {
- return super.getSchemaById(1);
+ return super.getSchemaById(schemaId);
}
}
}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateSerializer.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateSerializer.java
new file mode 100644
index 0000000000000..14aac2758a69d
--- /dev/null
+++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateSerializer.java
@@ -0,0 +1,76 @@
+package com.linkedin.metadata.boot.kafka;
+
+import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.DUHE_SCHEMA_REGISTRY_TOPIC_KEY;
+import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.MCL_VERSIONED_SCHEMA_REGISTRY_TOPIC_KEY;
+import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX;
+import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.SYSTEM_UPDATE_TOPIC_KEY_PREFIX;
+
+import com.linkedin.metadata.EventUtils;
+import com.linkedin.util.Pair;
+import io.confluent.kafka.schemaregistry.avro.AvroSchema;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import java.io.IOException;
+import java.util.Map;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+
+/** Used for early bootstrap to avoid contact with not yet existing schema registry */
+@Slf4j
+public class MockSystemUpdateSerializer extends KafkaAvroSerializer {
+
+ private static final String DATAHUB_SYSTEM_UPDATE_SUBJECT_SUFFIX = "-value";
+
+ private static final Map AVRO_SCHEMA_MAP =
+ Map.of(
+ DUHE_SCHEMA_REGISTRY_TOPIC_KEY, new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA),
+ MCL_VERSIONED_SCHEMA_REGISTRY_TOPIC_KEY,
+ new AvroSchema(EventUtils.ORIGINAL_MCL_AVRO_SCHEMA));
+
+ private Map> topicNameToAvroSchemaMap;
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+ super.configure(configs, isKey);
+ topicNameToAvroSchemaMap =
+ configs.entrySet().stream()
+ .filter(
+ e ->
+ e.getKey().startsWith(SYSTEM_UPDATE_TOPIC_KEY_PREFIX)
+ && !e.getKey().endsWith(SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX)
+ && e.getValue() instanceof String)
+ .map(
+ e -> {
+ Integer id =
+ Integer.valueOf(
+ (String) configs.get(e.getKey() + SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX));
+ return Pair.of(
+ (String) e.getValue(), Pair.of(AVRO_SCHEMA_MAP.get(e.getKey()), id));
+ })
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+ this.schemaRegistry = buildMockSchemaRegistryClient();
+ }
+
+ private MockSchemaRegistryClient buildMockSchemaRegistryClient() {
+ MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient();
+
+ if (topicNameToAvroSchemaMap != null) {
+ topicNameToAvroSchemaMap.forEach(
+ (topicName, schemaId) -> {
+ try {
+ schemaRegistry.register(
+ topicToSubjectName(topicName), schemaId.getFirst(), 0, schemaId.getSecond());
+ } catch (IOException | RestClientException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ return schemaRegistry;
+ }
+
+ public static String topicToSubjectName(String topicName) {
+ return topicName + DATAHUB_SYSTEM_UPDATE_SUBJECT_SUFFIX;
+ }
+}
diff --git a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java
index 1678fe92ec70e..17c5160494722 100644
--- a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java
+++ b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java
@@ -122,7 +122,7 @@ public void testAsyncDefaultAspects() throws URISyntaxException {
.request(req)
.build())));
_aspectResource.ingestProposal(mcp, "false");
- verify(_producer, times(5))
+ verify(_producer, times(10))
.produceMetadataChangeLog(eq(urn), any(AspectSpec.class), any(MetadataChangeLog.class));
verifyNoMoreInteractions(_producer);
}