diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java index bea38b616f86f..3a871a1814319 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java @@ -114,6 +114,16 @@ protected KafkaEventProducer kafkaEventProducer( return kafkaEventProducer; } + @Primary + @Bean(name = "schemaRegistryConfig") + @ConditionalOnProperty( + name = "kafka.schemaRegistry.type", + havingValue = InternalSchemaRegistryFactory.TYPE) + protected SchemaRegistryConfig schemaRegistryConfig( + @Qualifier("duheSchemaRegistryConfig") SchemaRegistryConfig duheSchemaRegistryConfig) { + return duheSchemaRegistryConfig; + } + @Configuration public static class SystemUpdateSetup { @Autowired private EntityService entityService; diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java index ed09a4a5aec2b..1e2a0201fb84c 100644 --- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java @@ -1,11 +1,20 @@ package com.linkedin.datahub.upgrade; +import static com.linkedin.metadata.EventUtils.RENAMED_MCL_AVRO_SCHEMA; +import static com.linkedin.metadata.boot.kafka.MockSystemUpdateSerializer.topicToSubjectName; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertNotNull; import com.linkedin.datahub.upgrade.system.SystemUpdate; +import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig; +import com.linkedin.metadata.boot.kafka.MockSystemUpdateDeserializer; +import com.linkedin.metadata.boot.kafka.MockSystemUpdateSerializer; import com.linkedin.metadata.dao.producer.KafkaEventProducer; import com.linkedin.metadata.entity.EntityServiceImpl; +import com.linkedin.mxe.Topics; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Optional; @@ -21,8 +30,8 @@ classes = {UpgradeCliApplication.class, UpgradeCliApplicationTestConfiguration.class}, properties = { "kafka.schemaRegistry.type=INTERNAL", - "DATAHUB_UPGRADE_HISTORY_TOPIC_NAME=test_due_topic", - "METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME=test_mcl_versioned_topic" + "DATAHUB_UPGRADE_HISTORY_TOPIC_NAME=" + Topics.DATAHUB_UPGRADE_HISTORY_TOPIC_NAME, + "METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME=" + Topics.METADATA_CHANGE_LOG_VERSIONED, }, args = {"-u", "SystemUpdate"}) public class DatahubUpgradeNoSchemaRegistryTest extends AbstractTestNGSpringContextTests { @@ -41,15 +50,29 @@ public class DatahubUpgradeNoSchemaRegistryTest extends AbstractTestNGSpringCont @Autowired private EntityServiceImpl entityService; + @Autowired + @Named("schemaRegistryConfig") + private SchemaRegistryConfig schemaRegistryConfig; + @Test public void testSystemUpdateInit() { assertNotNull(systemUpdate); } @Test - public void testSystemUpdateKafkaProducerOverride() { + public void testSystemUpdateKafkaProducerOverride() throws RestClientException, IOException { + assertEquals(schemaRegistryConfig.getDeserializer(), MockSystemUpdateDeserializer.class); + assertEquals(schemaRegistryConfig.getSerializer(), MockSystemUpdateSerializer.class); assertEquals(kafkaEventProducer, duheKafkaEventProducer); assertEquals(entityService.getProducer(), duheKafkaEventProducer); + + MockSystemUpdateSerializer serializer = new MockSystemUpdateSerializer(); + serializer.configure(schemaRegistryConfig.getProperties(), false); + SchemaRegistryClient registry = serializer.getSchemaRegistryClient(); + assertEquals( + registry.getId( + topicToSubjectName(Topics.METADATA_CHANGE_LOG_VERSIONED), RENAMED_MCL_AVRO_SCHEMA), + 2); } @Test diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java index e1257df9ad748..c190d1ed41c41 100644 --- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java @@ -12,6 +12,7 @@ import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; +import com.linkedin.mxe.Topics; import java.util.List; import javax.inject.Named; import org.springframework.beans.factory.annotation.Autowired; @@ -26,8 +27,8 @@ properties = { "BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_ENABLED=true", "kafka.schemaRegistry.type=INTERNAL", - "DATAHUB_UPGRADE_HISTORY_TOPIC_NAME=test_due_topic", - "METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME=test_mcl_versioned_topic" + "DATAHUB_UPGRADE_HISTORY_TOPIC_NAME=" + Topics.DATAHUB_UPGRADE_HISTORY_TOPIC_NAME, + "METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME=" + Topics.METADATA_CHANGE_LOG_VERSIONED, }, args = {"-u", "SystemUpdateNonBlocking"}) public class DatahubUpgradeNonBlockingTest extends AbstractTestNGSpringContextTests { diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java index 5c2d6fff0f07c..81d883d8ce36b 100644 --- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java @@ -1,18 +1,15 @@ package com.linkedin.datahub.upgrade; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import com.linkedin.gms.factory.auth.SystemAuthenticationFactory; import com.linkedin.metadata.graph.GraphService; import com.linkedin.metadata.models.registry.ConfigEntityRegistry; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.registry.SchemaRegistryService; +import com.linkedin.metadata.registry.SchemaRegistryServiceImpl; import com.linkedin.metadata.search.SearchService; import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders; +import com.linkedin.mxe.TopicConventionImpl; import io.ebean.Database; -import java.util.Optional; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.context.annotation.Bean; @@ -38,8 +35,6 @@ public class UpgradeCliApplicationTestConfiguration { @Bean public SchemaRegistryService schemaRegistryService() { - SchemaRegistryService mockService = mock(SchemaRegistryService.class); - when(mockService.getSchemaIdForTopic(anyString())).thenReturn(Optional.of(0)); - return mockService; + return new SchemaRegistryServiceImpl(new TopicConventionImpl()); } } diff --git a/metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java b/metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java index adff32d5d336d..8e92fd70b91dc 100644 --- a/metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java +++ b/metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java @@ -57,7 +57,7 @@ public class EventUtils { private static final Schema ORIGINAL_MCP_AVRO_SCHEMA = getAvroSchemaFromResource("avro/com/linkedin/mxe/MetadataChangeProposal.avsc"); - public static final Schema ORIGINAL_MCL_AVRO_SCHEMA = + private static final Schema ORIGINAL_MCL_AVRO_SCHEMA = getAvroSchemaFromResource("avro/com/linkedin/mxe/MetadataChangeLog.avsc"); private static final Schema ORIGINAL_FMCL_AVRO_SCHEMA = @@ -84,7 +84,7 @@ public class EventUtils { private static final Schema RENAMED_MCP_AVRO_SCHEMA = com.linkedin.pegasus2avro.mxe.MetadataChangeProposal.SCHEMA$; - private static final Schema RENAMED_MCL_AVRO_SCHEMA = + public static final Schema RENAMED_MCL_AVRO_SCHEMA = com.linkedin.pegasus2avro.mxe.MetadataChangeLog.SCHEMA$; private static final Schema RENAMED_FMCP_AVRO_SCHEMA = diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateSerializer.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateSerializer.java index 14aac2758a69d..0e2bbf2a0d9c9 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateSerializer.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateSerializer.java @@ -5,10 +5,12 @@ import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX; import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.SYSTEM_UPDATE_TOPIC_KEY_PREFIX; +import com.google.common.annotations.VisibleForTesting; import com.linkedin.metadata.EventUtils; import com.linkedin.util.Pair; import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroSerializer; import java.io.IOException; @@ -26,7 +28,7 @@ public class MockSystemUpdateSerializer extends KafkaAvroSerializer { Map.of( DUHE_SCHEMA_REGISTRY_TOPIC_KEY, new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA), MCL_VERSIONED_SCHEMA_REGISTRY_TOPIC_KEY, - new AvroSchema(EventUtils.ORIGINAL_MCL_AVRO_SCHEMA)); + new AvroSchema(EventUtils.RENAMED_MCL_AVRO_SCHEMA)); private Map> topicNameToAvroSchemaMap; @@ -70,6 +72,11 @@ private MockSchemaRegistryClient buildMockSchemaRegistryClient() { return schemaRegistry; } + @VisibleForTesting + public SchemaRegistryClient getSchemaRegistryClient() { + return schemaRegistry; + } + public static String topicToSubjectName(String topicName) { return topicName + DATAHUB_SYSTEM_UPDATE_SUBJECT_SUFFIX; }