Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(kafka): reconfigure consumers to allow different config #11869

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docker/profiles/docker-compose.gms.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ x-kafka-env: &kafka-env
# KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
SCHEMA_REGISTRY_TYPE: INTERNAL
KAFKA_SCHEMAREGISTRY_URL: http://datahub-gms:8080/schema-registry/api/
SPRING_KAFKA_CONSUMER_AUTO_OFFSET_RESET: ${SPRING_KAFKA_CONSUMER_AUTO_OFFSET_RESET:-earliest}

x-datahub-quickstart-telemetry-env: &datahub-quickstart-telemetry-env
DATAHUB_SERVER_TYPE: ${DATAHUB_SERVER_TYPE:-quickstart}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.metadata.kafka;

import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCL_EVENT_CONSUMER_NAME;

import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition;
import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook;
import com.linkedin.mxe.Topics;
Expand Down Expand Up @@ -39,7 +41,7 @@ public class MCLKafkaListenerRegistrar implements InitializingBean {
@Autowired private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Autowired
@Qualifier("kafkaEventConsumer")
@Qualifier(MCL_EVENT_CONSUMER_NAME)
private KafkaListenerContainerFactory<?> kafkaListenerContainerFactory;

@Value("${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.metadata.kafka;

import static com.linkedin.metadata.config.kafka.KafkaConfiguration.DEFAULT_EVENT_CONSUMER_NAME;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
Expand Down Expand Up @@ -60,7 +62,7 @@ public class MetadataChangeEventsProcessor {
"${METADATA_CHANGE_EVENT_NAME:${KAFKA_MCE_TOPIC_NAME:"
+ Topics.METADATA_CHANGE_EVENT
+ "}}",
containerFactory = "kafkaEventConsumer")
containerFactory = DEFAULT_EVENT_CONSUMER_NAME)
@Deprecated
public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static com.linkedin.metadata.Constants.MDC_CHANGE_TYPE;
import static com.linkedin.metadata.Constants.MDC_ENTITY_TYPE;
import static com.linkedin.metadata.Constants.MDC_ENTITY_URN;
import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCP_EVENT_CONSUMER_NAME;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
Expand Down Expand Up @@ -116,7 +117,7 @@ public void registerConsumerThrottle() {
@KafkaListener(
id = CONSUMER_GROUP_ID_VALUE,
topics = "${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.METADATA_CHANGE_PROPOSAL + "}",
containerFactory = "kafkaEventConsumer")
containerFactory = MCP_EVENT_CONSUMER_NAME)
public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "consume").time()) {
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.datahub.event;

import static com.linkedin.metadata.config.kafka.KafkaConfiguration.PE_EVENT_CONSUMER_NAME;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
Expand Down Expand Up @@ -56,7 +58,7 @@ public PlatformEventProcessor(
@KafkaListener(
id = "${PLATFORM_EVENT_KAFKA_CONSUMER_GROUP_ID:generic-platform-event-job-client}",
topics = {"${PLATFORM_EVENT_TOPIC_NAME:" + Topics.PLATFORM_EVENT + "}"},
containerFactory = "kafkaEventConsumer")
containerFactory = PE_EVENT_CONSUMER_NAME)
public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,13 @@ public class ConsumerConfiguration {
private int maxPartitionFetchBytes;
private boolean stopOnDeserializationError;
private boolean healthCheckEnabled;

private ConsumerOptions mcp;
private ConsumerOptions mcl;
private ConsumerOptions pe;

@Data
public static class ConsumerOptions {
private String autoOffsetReset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public class KafkaConfiguration {
"spring.deserializer.key.delegate.class";
public static final String VALUE_DESERIALIZER_DELEGATE_CLASS =
"spring.deserializer.value.delegate.class";
public static final String MCP_EVENT_CONSUMER_NAME = "mcpEventConsumer";
public static final String MCL_EVENT_CONSUMER_NAME = "mclEventConsumer";
public static final String PE_EVENT_CONSUMER_NAME = "platformEventConsumer";
public static final String DEFAULT_EVENT_CONSUMER_NAME = "kafkaEventConsumer";

private String bootstrapServers;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,13 @@ kafka:
maxPartitionFetchBytes: ${KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES:5242880} # the max bytes consumed per partition
stopOnDeserializationError: ${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:true} # Stops kafka listener container on deserialization error, allows user to fix problems before moving past problematic offset. If false will log and move forward past the offset
healthCheckEnabled: ${KAFKA_CONSUMER_HEALTH_CHECK_ENABLED:true} # Sets the health indicator to down when a message listener container has stopped due to a deserialization failure, will force consumer apps to restart through k8s and docker-compose health mechanisms
mcp:
autoOffsetReset: ${KAFKA_CONSUMER_MCP_AUTO_OFFSET_RESET:earliest}
mcl:
autoOffsetReset: ${KAFKA_CONSUMER_MCL_AUTO_OFFSET_RESET:earliest}
pe:
autoOffsetReset: ${KAFKA_CONSUMER_PE_AUTO_OFFSET_RESET:latest}

schemaRegistry:
type: ${SCHEMA_REGISTRY_TYPE:KAFKA} # INTERNAL or KAFKA or AWS_GLUE
url: ${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package com.linkedin.gms.factory.kafka;

import static com.linkedin.metadata.config.kafka.KafkaConfiguration.DEFAULT_EVENT_CONSUMER_NAME;
import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCL_EVENT_CONSUMER_NAME;
import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCP_EVENT_CONSUMER_NAME;
import static com.linkedin.metadata.config.kafka.KafkaConfiguration.PE_EVENT_CONSUMER_NAME;

import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.config.kafka.ConsumerConfiguration;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -23,7 +31,6 @@
@Slf4j
@Configuration
public class KafkaEventConsumerFactory {

private int kafkaEventConsumerConcurrency;

@Bean(name = "kafkaConsumerFactory")
Expand Down Expand Up @@ -87,33 +94,100 @@ private static Map<String, Object> buildCustomizedProperties(
return customizedProperties;
}

@Bean(name = "kafkaEventConsumer")
@Bean(name = PE_EVENT_CONSUMER_NAME)
protected KafkaListenerContainerFactory<?> platformEventConsumer(
@Qualifier("kafkaConsumerFactory")
DefaultKafkaConsumerFactory<String, GenericRecord> kafkaConsumerFactory,
@Qualifier("configurationProvider") ConfigurationProvider configurationProvider) {

return buildDefaultKafkaListenerContainerFactory(
PE_EVENT_CONSUMER_NAME,
kafkaConsumerFactory,
configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(),
configurationProvider.getKafka().getConsumer().getPe());
}

@Bean(name = MCP_EVENT_CONSUMER_NAME)
protected KafkaListenerContainerFactory<?> mcpEventConsumer(
@Qualifier("kafkaConsumerFactory")
DefaultKafkaConsumerFactory<String, GenericRecord> kafkaConsumerFactory,
@Qualifier("configurationProvider") ConfigurationProvider configurationProvider) {

return buildDefaultKafkaListenerContainerFactory(
MCP_EVENT_CONSUMER_NAME,
kafkaConsumerFactory,
configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(),
configurationProvider.getKafka().getConsumer().getMcp());
}

@Bean(name = MCL_EVENT_CONSUMER_NAME)
protected KafkaListenerContainerFactory<?> mclEventConsumer(
@Qualifier("kafkaConsumerFactory")
DefaultKafkaConsumerFactory<String, GenericRecord> kafkaConsumerFactory,
@Qualifier("configurationProvider") ConfigurationProvider configurationProvider) {

return buildDefaultKafkaListenerContainerFactory(
MCL_EVENT_CONSUMER_NAME,
kafkaConsumerFactory,
configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(),
configurationProvider.getKafka().getConsumer().getMcl());
}

@Bean(name = DEFAULT_EVENT_CONSUMER_NAME)
protected KafkaListenerContainerFactory<?> kafkaEventConsumer(
@Qualifier("kafkaConsumerFactory")
DefaultKafkaConsumerFactory<String, GenericRecord> kafkaConsumerFactory,
@Qualifier("configurationProvider") ConfigurationProvider configurationProvider) {

return buildDefaultKafkaListenerContainerFactory(
DEFAULT_EVENT_CONSUMER_NAME,
kafkaConsumerFactory,
configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(),
null);
}

private KafkaListenerContainerFactory<?> buildDefaultKafkaListenerContainerFactory(
String consumerFactoryName,
DefaultKafkaConsumerFactory<String, GenericRecord> kafkaConsumerFactory,
boolean isStopOnDeserializationError,
@Nullable ConsumerConfiguration.ConsumerOptions consumerOptions) {

final DefaultKafkaConsumerFactory<String, GenericRecord> factoryWithOverrides;
if (consumerOptions != null) {
// Copy the base config
Map<String, Object> props = new HashMap<>(kafkaConsumerFactory.getConfigurationProperties());
// Override just the auto.offset.reset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerOptions.getAutoOffsetReset());
factoryWithOverrides =
new DefaultKafkaConsumerFactory<>(
props,
kafkaConsumerFactory.getKeyDeserializer(),
kafkaConsumerFactory.getValueDeserializer());
} else {
factoryWithOverrides = kafkaConsumerFactory;
}

ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory);
factory.setConsumerFactory(factoryWithOverrides);
factory.setContainerCustomizer(new ThreadPoolContainerCustomizer());
factory.setConcurrency(kafkaEventConsumerConcurrency);

/* Sets up a delegating error handler for Deserialization errors, if disabled will
use DefaultErrorHandler (does back-off retry and then logs) rather than stopping the container. Stopping the container
prevents lost messages until the error can be examined, disabling this will allow progress, but may lose data
*/
if (configurationProvider.getKafka().getConsumer().isStopOnDeserializationError()) {
if (isStopOnDeserializationError) {
CommonDelegatingErrorHandler delegatingErrorHandler =
new CommonDelegatingErrorHandler(new DefaultErrorHandler());
delegatingErrorHandler.addDelegate(
DeserializationException.class, new CommonContainerStoppingErrorHandler());
factory.setCommonErrorHandler(delegatingErrorHandler);
}
log.info(
String.format(
"Event-based KafkaListenerContainerFactory built successfully. Consumer concurrency = %s",
kafkaEventConsumerConcurrency));
"Event-based {} KafkaListenerContainerFactory built successfully. Consumer concurrency = {}",
consumerFactoryName,
kafkaEventConsumerConcurrency);

return factory;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.datahubproject.openapi.test;

import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.config.kafka.KafkaConfiguration.DEFAULT_EVENT_CONSUMER_NAME;
import static org.testng.Assert.*;

import com.linkedin.common.urn.Urn;
Expand Down Expand Up @@ -199,7 +200,7 @@ public void testPEConsumption()
@KafkaListener(
id = "test-mcp-consumer",
topics = Topics.METADATA_CHANGE_PROPOSAL,
containerFactory = "kafkaEventConsumer",
containerFactory = DEFAULT_EVENT_CONSUMER_NAME,
properties = {"auto.offset.reset:earliest"})
public void receiveMCP(ConsumerRecord<String, GenericRecord> consumerRecord) {

Expand All @@ -216,7 +217,7 @@ public void receiveMCP(ConsumerRecord<String, GenericRecord> consumerRecord) {
@KafkaListener(
id = "test-mcl-consumer",
topics = Topics.METADATA_CHANGE_LOG_VERSIONED,
containerFactory = "kafkaEventConsumer",
containerFactory = DEFAULT_EVENT_CONSUMER_NAME,
properties = {"auto.offset.reset:earliest"})
public void receiveMCL(ConsumerRecord<String, GenericRecord> consumerRecord) {

Expand All @@ -232,7 +233,7 @@ public void receiveMCL(ConsumerRecord<String, GenericRecord> consumerRecord) {
@KafkaListener(
id = "test-pe-consumer",
topics = Topics.PLATFORM_EVENT,
containerFactory = "kafkaEventConsumer",
containerFactory = DEFAULT_EVENT_CONSUMER_NAME,
properties = {"auto.offset.reset:earliest"})
public void receivePE(ConsumerRecord<String, GenericRecord> consumerRecord) {

Expand Down
Loading