Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): increase kafka message size and enable compression #9038

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
cbff901
feat(kafka): increase kafka message size and enable compression
david-leifker Oct 18, 2023
6a05c18
Merge branch 'master' into kafka-message-size-parameters
david-leifker Oct 18, 2023
0669a93
Merge branch 'master' into kafka-message-size-parameters
david-leifker Oct 19, 2023
228cfdc
Merge branch 'master' into kafka-message-size-parameters
david-leifker Oct 20, 2023
d8d5ecd
feat(kafka): increase kafka message size and enable compression
david-leifker Oct 18, 2023
666c3b5
Merge branch 'master' into kafka-message-size-parameters
pedro93 Oct 21, 2023
a96586d
Merge branch 'master' into kafka-message-size-parameters
pedro93 Oct 21, 2023
74908c1
feat(quickstart): quickstart broker max message bytes
david-leifker Oct 21, 2023
a055524
Merge branch 'master' into kafka-message-size-parameters
david-leifker Oct 23, 2023
1bacbbe
add more logs
david-leifker Oct 28, 2023
83479a9
Merge remote-tracking branch 'origin/master' into kafka-message-size-…
david-leifker Oct 28, 2023
d06f7f0
feat(kafka): increase kafka message size and enable compression
david-leifker Oct 18, 2023
78fcfbf
feat(kafka): increase kafka message size and enable compression
david-leifker Oct 18, 2023
ddd46e5
feat(quickstart): quickstart broker max message bytes
david-leifker Oct 21, 2023
8a1bfca
add more logs
david-leifker Oct 28, 2023
98db5ac
Merge branch 'kafka-message-size-parameters' of github.com:david-leif…
david-leifker Oct 28, 2023
1d71670
additional logs
david-leifker Oct 28, 2023
9654c79
add java-snappy for kafka compression
david-leifker Oct 28, 2023
e531a72
add java-snappy for kafka compression
david-leifker Oct 28, 2023
5607543
Merge remote-tracking branch 'upstream/kafka-message-size-parameters'…
david-leifker Oct 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker/kafka-setup/kafka-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

: ${PARTITIONS:=1}
: ${REPLICATION_FACTOR:=1}
: ${MAX_MESSAGE_BYTES:=5242880}

: ${KAFKA_PROPERTIES_SECURITY_PROTOCOL:=PLAINTEXT}

Expand All @@ -12,3 +13,4 @@ export KAFKA_HEAP_OPTS="-Xmx64M"
CONNECTION_PROPERTIES_PATH=/tmp/connection.properties

WORKERS=4
DELIMITER=";"
46 changes: 30 additions & 16 deletions docker/kafka-setup/kafka-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,24 +100,43 @@ exec 4<&-
send() {
work_id=$1
topic_args=$2
echo sending $work_id $topic_args
echo "$work_id" "$topic_args" 1>&3 ## the fifo is fd 3
topic_config=$3

echo -e "sending $work_id\n worker_args: ${topic_args}${DELIMITER}${topic_config}"
echo "$work_id" "${topic_args}${DELIMITER}${topic_config}" 1>&3 ## the fifo is fd 3
}

## Produce the jobs to run.
send "$METADATA_AUDIT_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_AUDIT_EVENT_NAME"
send "$METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_EVENT_NAME"
send "$FAILED_METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_EVENT_NAME"
send "$METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME"
send "$METADATA_AUDIT_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_AUDIT_EVENT_NAME" \
"--entity-type topics --entity-name $METADATA_AUDIT_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"

send "$METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_EVENT_NAME" \
"--entity-type topics --entity-name $METADATA_CHANGE_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$FAILED_METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_EVENT_NAME" \
"--entity-type topics --entity-name $FAILED_METADATA_CHANGE_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"

send "$METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" \
"--entity-type topics --entity-name $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"

# Set retention to 90 days
send "$METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" "--partitions $PARTITIONS --config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME"
send "$METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_PROPOSAL_TOPIC_NAME"
send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME"
send "$PLATFORM_EVENT_TOPIC_NAME" "--partitions $PARTITIONS --topic $PLATFORM_EVENT_TOPIC_NAME"
send "$METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" "--partitions $PARTITIONS --config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" \
"--entity-type topics --entity-name $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"

send "$METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_PROPOSAL_TOPIC_NAME" \
"--entity-type topics --entity-name $METADATA_CHANGE_PROPOSAL_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" \
"--entity-type topics --entity-name $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"

send "$PLATFORM_EVENT_TOPIC_NAME" "--partitions $PARTITIONS --topic $PLATFORM_EVENT_TOPIC_NAME" \
"--entity-type topics --entity-name $PLATFORM_EVENT_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"

# Infinite retention upgrade topic
send "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" "--partitions 1 --config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME"
# Make sure the retention.ms config for $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME is configured to infinite
# Please see the bug report below for details
# https://github.com/datahub-project/datahub/issues/7882
send "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" "--partitions 1 --config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" \
"--entity-type topics --entity-name "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" --alter --add-config retention.ms=-1"

# Create topic for datahub usage event
if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then
send "$DATAHUB_USAGE_EVENT_NAME" "--partitions $PARTITIONS --topic $DATAHUB_USAGE_EVENT_NAME"
Expand Down Expand Up @@ -148,8 +167,3 @@ if [[ $USE_CONFLUENT_SCHEMA_REGISTRY == "TRUE" ]]; then
--entity-name _schemas \
--alter --add-config cleanup.policy=compact
fi

# Make sure the retention.ms config for $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME is configured to infinite
# Please see the bug report below for details
# https://github.com/datahub-project/datahub/issues/7882
kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --entity-type topics --entity-name "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" --alter --add-config retention.ms=-1
10 changes: 9 additions & 1 deletion docker/kafka-setup/kafka-topic-workers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,18 @@ START_LOCK=$4
## the queue workers are supposed to be doing
job() {
i=$1
topic_args=$2
worker_args=$2
topic_args=$(echo $worker_args | cut -d "$DELIMITER" -f 1)
topic_config=$(echo $worker_args | cut -d "$DELIMITER" -f 2)

echo " $i: kafka-topics.sh --create --if-not-exist $topic_args"
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER \
--replication-factor $REPLICATION_FACTOR \
$topic_args
if [[ ! -z "$topic_config" ]]; then
echo " $i: kafka-configs.sh $topic_config"
kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER $topic_config
fi
}

## This is the worker to read from the queue.
Expand Down
22 changes: 13 additions & 9 deletions docs/deploy/environment-vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,19 @@ In general, there are **lots** of Kafka configuration environment variables for
These environment variables follow the standard Spring representation of properties as environment variables.
Simply replace the dot, `.`, with an underscore, `_`, and convert to uppercase.

| Variable | Default | Unit/Type | Components | Description |
|-----------------------------------------------------|----------------------------------------------|-----------|-----------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `KAFKA_LISTENER_CONCURRENCY` | 1 | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Number of Kafka consumer threads. Optimize throughput by matching to topic partitions. |
| `SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE` | 1048576 | bytes | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Max produced message size. Note that the topic configuration is not controlled by this variable. |
| `SCHEMA_REGISTRY_TYPE` | `INTERNAL` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry implementation. One of `INTERNAL` or `KAFKA` or `AWS_GLUE` |
| `KAFKA_SCHEMAREGISTRY_URL` | `http://localhost:8080/schema-registry/api/` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry url. Used for `INTERNAL` and `KAFKA`. The default value is for the `GMS` component. The `MCE Consumer` and `MAE Consumer` should be the `GMS` hostname and port. |
| `AWS_GLUE_SCHEMA_REGISTRY_REGION` | `us-east-1` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry implementation. |
| `AWS_GLUE_SCHEMA_REGISTRY_NAME` | `` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry. |
| `USE_CONFLUENT_SCHEMA_REGISTRY` | `true` | boolean | [`kafka-setup`] | Enable Confluent schema registry configuration. |
| Variable | Default | Unit/Type | Components | Description |
|-----------------------------------------------------|----------------------------------------------|-----------|------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `KAFKA_LISTENER_CONCURRENCY` | 1 | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Number of Kafka consumer threads. Optimize throughput by matching to topic partitions. |
| `SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE` | 1048576 | bytes | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Max produced message size. Note that the topic configuration is not controlled by this variable. |
| `SCHEMA_REGISTRY_TYPE` | `INTERNAL` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry implementation. One of `INTERNAL` or `KAFKA` or `AWS_GLUE` |
| `KAFKA_SCHEMAREGISTRY_URL` | `http://localhost:8080/schema-registry/api/` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry url. Used for `INTERNAL` and `KAFKA`. The default value is for the `GMS` component. The `MCE Consumer` and `MAE Consumer` should be the `GMS` hostname and port. |
| `AWS_GLUE_SCHEMA_REGISTRY_REGION` | `us-east-1` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry implementation. |
| `AWS_GLUE_SCHEMA_REGISTRY_NAME` | `` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry. |
| `USE_CONFLUENT_SCHEMA_REGISTRY` | `true` | boolean | [`kafka-setup`] | Enable Confluent schema registry configuration. |
| `KAFKA_PRODUCER_MAX_REQUEST_SIZE` | `5242880` | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Max produced message size. Note that the topic configuration is not controlled by this variable. |
| `KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES` | `5242880` | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. |
| `MAX_MESSAGE_BYTES` | `5242880` | integer | [`kafka-setup`] | Sets the max message size on the kakfa topics. |
| `KAFKA_PRODUCER_COMPRESSION_TYPE` | `snappy` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | The compression used by the producer. |

## Frontend

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.linkedin.metadata.config.kafka;

import lombok.Data;


@Data
public class ConsumerConfiguration {

private int maxPartitionFetchBytes;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ public class KafkaConfiguration {
private SchemaRegistryConfiguration schemaRegistry;

private ProducerConfiguration producer;

private ConsumerConfiguration consumer;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,8 @@ public class ProducerConfiguration {
private int requestTimeout;

private int backoffTimeout;

private String compressionType;

private int maxRequestSize;
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ kafka:
deliveryTimeout: ${KAFKA_PRODUCER_DELIVERY_TIMEOUT:30000}
requestTimeout: ${KAFKA_PRODUCER_REQUEST_TIMEOUT:3000}
backoffTimeout: ${KAFKA_PRODUCER_BACKOFF_TIMEOUT:500}
compressionType: ${KAFKA_PRODUCER_COMPRESSION_TYPE:snappy} # producer's compression algorithm
maxRequestSize: ${KAFKA_PRODUCER_MAX_REQUEST_SIZE:5242880} # the max bytes sent by the producer, also see kafka-setup MAX_MESSAGE_BYTES for matching value
consumer:
maxPartitionFetchBytes: ${KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES:5242880} # the max bytes consumed per partition
schemaRegistry:
type: ${SCHEMA_REGISTRY_TYPE:KAFKA} # INTERNAL or KAFKA or AWS_GLUE
url: ${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public static Map<String, Object> buildProducerProperties(SchemaRegistryConfig s
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaConfiguration.getProducer().getDeliveryTimeout());
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaConfiguration.getProducer().getRequestTimeout());
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaConfiguration.getProducer().getBackoffTimeout());
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, kafkaConfiguration.getProducer().getCompressionType());
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaConfiguration.getProducer().getMaxRequestSize());

// Override KafkaProperties with SchemaRegistryConfig only for non-empty values
schemaRegistryConfig.getProperties().entrySet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ private static Map<String, Object> buildCustomizedProperties(KafkaProperties bas
consumerProps.setEnableAutoCommit(true);
consumerProps.setAutoCommitInterval(Duration.ofSeconds(10));


// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaConfiguration.getBootstrapServers() != null && kafkaConfiguration.getBootstrapServers().length() > 0) {
consumerProps.setBootstrapServers(Arrays.asList(kafkaConfiguration.getBootstrapServers().split(",")));
Expand All @@ -84,6 +85,9 @@ private static Map<String, Object> buildCustomizedProperties(KafkaProperties bas
.filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty())
.forEach(entry -> customizedProperties.put(entry.getKey(), entry.getValue()));

customizedProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
kafkaConfiguration.getConsumer().getMaxPartitionFetchBytes());

return customizedProperties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import com.linkedin.gms.factory.config.ConfigurationProvider;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
Expand Down Expand Up @@ -40,10 +43,14 @@ protected KafkaListenerContainerFactory<?> createInstance(@Qualifier("configurat
consumerProps.setBootstrapServers(Arrays.asList(kafkaConfiguration.getBootstrapServers().split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092

Map<String, Object> customizedProperties = consumerProps.buildProperties();
customizedProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
kafkaConfiguration.getConsumer().getMaxPartitionFetchBytes());

ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setContainerCustomizer(new ThreadPoolContainerCustomizer());
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties()));
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(customizedProperties));

log.info("Simple KafkaListenerContainerFactory built successfully");

Expand Down
Loading