From 773bbe767cbd1a6d9eca439aab1dc547958def1b Mon Sep 17 00:00:00 2001 From: David Leifker Date: Fri, 15 Nov 2024 10:33:50 -0600 Subject: [PATCH] refactor(kafka): reconfigure consumers to allow different config --- docker/profiles/docker-compose.gms.yml | 1 - .../kafka/MCLKafkaListenerRegistrar.java | 4 +- .../kafka/MetadataChangeEventsProcessor.java | 4 +- .../MetadataChangeProposalsProcessor.java | 3 +- .../datahub/event/PlatformEventProcessor.java | 4 +- .../config/kafka/ConsumerConfiguration.java | 9 ++ .../config/kafka/KafkaConfiguration.java | 4 + .../src/main/resources/application.yaml | 7 ++ .../kafka/KafkaEventConsumerFactory.java | 88 +++++++++++++++++-- .../test/SchemaRegistryControllerTest.java | 7 +- 10 files changed, 116 insertions(+), 15 deletions(-) diff --git a/docker/profiles/docker-compose.gms.yml b/docker/profiles/docker-compose.gms.yml index 147bbd35ff646..824c8024b05d6 100644 --- a/docker/profiles/docker-compose.gms.yml +++ b/docker/profiles/docker-compose.gms.yml @@ -40,7 +40,6 @@ x-kafka-env: &kafka-env # KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 SCHEMA_REGISTRY_TYPE: INTERNAL KAFKA_SCHEMAREGISTRY_URL: http://datahub-gms:8080/schema-registry/api/ - SPRING_KAFKA_CONSUMER_AUTO_OFFSET_RESET: ${SPRING_KAFKA_CONSUMER_AUTO_OFFSET_RESET:-earliest} x-datahub-quickstart-telemetry-env: &datahub-quickstart-telemetry-env DATAHUB_SERVER_TYPE: ${DATAHUB_SERVER_TYPE:-quickstart} diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java index fb2880f617d30..c909b0034a912 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java @@ -1,5 +1,7 @@ package com.linkedin.metadata.kafka; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCL_EVENT_CONSUMER_NAME; + import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition; import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook; import com.linkedin.mxe.Topics; @@ -39,7 +41,7 @@ public class MCLKafkaListenerRegistrar implements InitializingBean { @Autowired private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; @Autowired - @Qualifier("kafkaEventConsumer") + @Qualifier(MCL_EVENT_CONSUMER_NAME) private KafkaListenerContainerFactory kafkaListenerContainerFactory; @Value("${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}") diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java index 1b3d19915b439..5d2f6452e6919 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java @@ -1,5 +1,7 @@ package com.linkedin.metadata.kafka; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.DEFAULT_EVENT_CONSUMER_NAME; + import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; @@ -60,7 +62,7 @@ public class MetadataChangeEventsProcessor { "${METADATA_CHANGE_EVENT_NAME:${KAFKA_MCE_TOPIC_NAME:" + Topics.METADATA_CHANGE_EVENT + "}}", - containerFactory = "kafkaEventConsumer") + containerFactory = DEFAULT_EVENT_CONSUMER_NAME) @Deprecated public void consume(final ConsumerRecord consumerRecord) { try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) { diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java index 22c2b4b9c0450..ef87afdef46cb 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java @@ -4,6 +4,7 @@ import static com.linkedin.metadata.Constants.MDC_CHANGE_TYPE; import static com.linkedin.metadata.Constants.MDC_ENTITY_TYPE; import static com.linkedin.metadata.Constants.MDC_ENTITY_URN; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCP_EVENT_CONSUMER_NAME; import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; @@ -116,7 +117,7 @@ public void registerConsumerThrottle() { @KafkaListener( id = CONSUMER_GROUP_ID_VALUE, topics = "${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.METADATA_CHANGE_PROPOSAL + "}", - containerFactory = "kafkaEventConsumer") + containerFactory = MCP_EVENT_CONSUMER_NAME) public void consume(final ConsumerRecord consumerRecord) { try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "consume").time()) { kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); diff --git a/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java b/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java index 358a2ac0c2ee3..5d11697bed93d 100644 --- a/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java +++ b/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java @@ -1,5 +1,7 @@ package com.datahub.event; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.PE_EVENT_CONSUMER_NAME; + import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; @@ -56,7 +58,7 @@ public PlatformEventProcessor( @KafkaListener( id = "${PLATFORM_EVENT_KAFKA_CONSUMER_GROUP_ID:generic-platform-event-job-client}", topics = {"${PLATFORM_EVENT_TOPIC_NAME:" + Topics.PLATFORM_EVENT + "}"}, - containerFactory = "kafkaEventConsumer") + containerFactory = PE_EVENT_CONSUMER_NAME) public void consume(final ConsumerRecord consumerRecord) { try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) { diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ConsumerConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ConsumerConfiguration.java index 60f3e1b4fef76..9b476483a2baf 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ConsumerConfiguration.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ConsumerConfiguration.java @@ -8,4 +8,13 @@ public class ConsumerConfiguration { private int maxPartitionFetchBytes; private boolean stopOnDeserializationError; private boolean healthCheckEnabled; + + private ConsumerOptions mcp; + private ConsumerOptions mcl; + private ConsumerOptions pe; + + @Data + public static class ConsumerOptions { + private String autoOffsetReset; + } } diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java index b03aedc1a7b5e..ae0d3a3bb4647 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java @@ -20,6 +20,10 @@ public class KafkaConfiguration { "spring.deserializer.key.delegate.class"; public static final String VALUE_DESERIALIZER_DELEGATE_CLASS = "spring.deserializer.value.delegate.class"; + public static final String MCP_EVENT_CONSUMER_NAME = "mcpEventConsumer"; + public static final String MCL_EVENT_CONSUMER_NAME = "mclEventConsumer"; + public static final String PE_EVENT_CONSUMER_NAME = "platformEventConsumer"; + public static final String DEFAULT_EVENT_CONSUMER_NAME = "kafkaEventConsumer"; private String bootstrapServers; diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 8010ae187b6c8..4945b36a251c2 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -289,6 +289,13 @@ kafka: maxPartitionFetchBytes: ${KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES:5242880} # the max bytes consumed per partition stopOnDeserializationError: ${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:true} # Stops kafka listener container on deserialization error, allows user to fix problems before moving past problematic offset. If false will log and move forward past the offset healthCheckEnabled: ${KAFKA_CONSUMER_HEALTH_CHECK_ENABLED:true} # Sets the health indicator to down when a message listener container has stopped due to a deserialization failure, will force consumer apps to restart through k8s and docker-compose health mechanisms + mcp: + autoOffsetReset: ${KAFKA_CONSUMER_MCP_AUTO_OFFSET_RESET:earliest} + mcl: + autoOffsetReset: ${KAFKA_CONSUMER_MCL_AUTO_OFFSET_RESET:earliest} + pe: + autoOffsetReset: ${KAFKA_CONSUMER_PE_AUTO_OFFSET_RESET:latest} + schemaRegistry: type: ${SCHEMA_REGISTRY_TYPE:KAFKA} # INTERNAL or KAFKA or AWS_GLUE url: ${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java index 750af8ec488df..a1ee4df360b7e 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java @@ -1,10 +1,18 @@ package com.linkedin.gms.factory.kafka; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.DEFAULT_EVENT_CONSUMER_NAME; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCL_EVENT_CONSUMER_NAME; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCP_EVENT_CONSUMER_NAME; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.PE_EVENT_CONSUMER_NAME; + import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.metadata.config.kafka.ConsumerConfiguration; import com.linkedin.metadata.config.kafka.KafkaConfiguration; import java.time.Duration; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; +import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -23,7 +31,6 @@ @Slf4j @Configuration public class KafkaEventConsumerFactory { - private int kafkaEventConsumerConcurrency; @Bean(name = "kafkaConsumerFactory") @@ -87,15 +94,82 @@ private static Map buildCustomizedProperties( return customizedProperties; } - @Bean(name = "kafkaEventConsumer") + @Bean(name = PE_EVENT_CONSUMER_NAME) + protected KafkaListenerContainerFactory platformEventConsumer( + @Qualifier("kafkaConsumerFactory") + DefaultKafkaConsumerFactory kafkaConsumerFactory, + @Qualifier("configurationProvider") ConfigurationProvider configurationProvider) { + + return buildDefaultKafkaListenerContainerFactory( + PE_EVENT_CONSUMER_NAME, + kafkaConsumerFactory, + configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(), + configurationProvider.getKafka().getConsumer().getPe()); + } + + @Bean(name = MCP_EVENT_CONSUMER_NAME) + protected KafkaListenerContainerFactory mcpEventConsumer( + @Qualifier("kafkaConsumerFactory") + DefaultKafkaConsumerFactory kafkaConsumerFactory, + @Qualifier("configurationProvider") ConfigurationProvider configurationProvider) { + + return buildDefaultKafkaListenerContainerFactory( + MCP_EVENT_CONSUMER_NAME, + kafkaConsumerFactory, + configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(), + configurationProvider.getKafka().getConsumer().getMcp()); + } + + @Bean(name = MCL_EVENT_CONSUMER_NAME) + protected KafkaListenerContainerFactory mclEventConsumer( + @Qualifier("kafkaConsumerFactory") + DefaultKafkaConsumerFactory kafkaConsumerFactory, + @Qualifier("configurationProvider") ConfigurationProvider configurationProvider) { + + return buildDefaultKafkaListenerContainerFactory( + MCL_EVENT_CONSUMER_NAME, + kafkaConsumerFactory, + configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(), + configurationProvider.getKafka().getConsumer().getMcl()); + } + + @Bean(name = DEFAULT_EVENT_CONSUMER_NAME) protected KafkaListenerContainerFactory kafkaEventConsumer( @Qualifier("kafkaConsumerFactory") DefaultKafkaConsumerFactory kafkaConsumerFactory, @Qualifier("configurationProvider") ConfigurationProvider configurationProvider) { + return buildDefaultKafkaListenerContainerFactory( + DEFAULT_EVENT_CONSUMER_NAME, + kafkaConsumerFactory, + configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(), + null); + } + + private KafkaListenerContainerFactory buildDefaultKafkaListenerContainerFactory( + String consumerFactoryName, + DefaultKafkaConsumerFactory kafkaConsumerFactory, + boolean isStopOnDeserializationError, + @Nullable ConsumerConfiguration.ConsumerOptions consumerOptions) { + + final DefaultKafkaConsumerFactory factoryWithOverrides; + if (consumerOptions != null) { + // Copy the base config + Map props = new HashMap<>(kafkaConsumerFactory.getConfigurationProperties()); + // Override just the auto.offset.reset + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerOptions.getAutoOffsetReset()); + factoryWithOverrides = + new DefaultKafkaConsumerFactory<>( + props, + kafkaConsumerFactory.getKeyDeserializer(), + kafkaConsumerFactory.getValueDeserializer()); + } else { + factoryWithOverrides = kafkaConsumerFactory; + } + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(kafkaConsumerFactory); + factory.setConsumerFactory(factoryWithOverrides); factory.setContainerCustomizer(new ThreadPoolContainerCustomizer()); factory.setConcurrency(kafkaEventConsumerConcurrency); @@ -103,7 +177,7 @@ protected KafkaListenerContainerFactory kafkaEventConsumer( use DefaultErrorHandler (does back-off retry and then logs) rather than stopping the container. Stopping the container prevents lost messages until the error can be examined, disabling this will allow progress, but may lose data */ - if (configurationProvider.getKafka().getConsumer().isStopOnDeserializationError()) { + if (isStopOnDeserializationError) { CommonDelegatingErrorHandler delegatingErrorHandler = new CommonDelegatingErrorHandler(new DefaultErrorHandler()); delegatingErrorHandler.addDelegate( @@ -111,9 +185,9 @@ use DefaultErrorHandler (does back-off retry and then logs) rather than stopping factory.setCommonErrorHandler(delegatingErrorHandler); } log.info( - String.format( - "Event-based KafkaListenerContainerFactory built successfully. Consumer concurrency = %s", - kafkaEventConsumerConcurrency)); + "Event-based {} KafkaListenerContainerFactory built successfully. Consumer concurrency = {}", + consumerFactoryName, + kafkaEventConsumerConcurrency); return factory; } diff --git a/metadata-service/schema-registry-servlet/src/test/java/io/datahubproject/openapi/test/SchemaRegistryControllerTest.java b/metadata-service/schema-registry-servlet/src/test/java/io/datahubproject/openapi/test/SchemaRegistryControllerTest.java index 664766f204e46..e8deed00672da 100644 --- a/metadata-service/schema-registry-servlet/src/test/java/io/datahubproject/openapi/test/SchemaRegistryControllerTest.java +++ b/metadata-service/schema-registry-servlet/src/test/java/io/datahubproject/openapi/test/SchemaRegistryControllerTest.java @@ -1,6 +1,7 @@ package io.datahubproject.openapi.test; import static com.linkedin.metadata.Constants.*; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.DEFAULT_EVENT_CONSUMER_NAME; import static org.testng.Assert.*; import com.linkedin.common.urn.Urn; @@ -199,7 +200,7 @@ public void testPEConsumption() @KafkaListener( id = "test-mcp-consumer", topics = Topics.METADATA_CHANGE_PROPOSAL, - containerFactory = "kafkaEventConsumer", + containerFactory = DEFAULT_EVENT_CONSUMER_NAME, properties = {"auto.offset.reset:earliest"}) public void receiveMCP(ConsumerRecord consumerRecord) { @@ -216,7 +217,7 @@ public void receiveMCP(ConsumerRecord consumerRecord) { @KafkaListener( id = "test-mcl-consumer", topics = Topics.METADATA_CHANGE_LOG_VERSIONED, - containerFactory = "kafkaEventConsumer", + containerFactory = DEFAULT_EVENT_CONSUMER_NAME, properties = {"auto.offset.reset:earliest"}) public void receiveMCL(ConsumerRecord consumerRecord) { @@ -232,7 +233,7 @@ public void receiveMCL(ConsumerRecord consumerRecord) { @KafkaListener( id = "test-pe-consumer", topics = Topics.PLATFORM_EVENT, - containerFactory = "kafkaEventConsumer", + containerFactory = DEFAULT_EVENT_CONSUMER_NAME, properties = {"auto.offset.reset:earliest"}) public void receivePE(ConsumerRecord consumerRecord) {