Skip to content

Commit

Permalink
feat(kafka): increase kafka message size and enable compression
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Oct 20, 2023
1 parent 228cfdc commit d8d5ecd
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 18 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ buildscript {
plugins {
id 'com.gorylenko.gradle-git-properties' version '2.4.0-rc2'
id 'com.github.johnrengelman.shadow' version '6.1.0'
id 'com.palantir.docker' version '0.35.0'
id 'com.palantir.docker' version '0.35.0' apply false
// https://blog.ltgt.net/javax-jakarta-mess-and-gradle-solution/
// TODO id "org.gradlex.java-ecosystem-capabilities" version "1.0"
}
Expand Down
10 changes: 7 additions & 3 deletions datahub-frontend/app/client/KafkaTrackingProducer.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package client;

import com.linkedin.metadata.config.kafka.ProducerConfiguration;
import com.typesafe.config.Config;
import config.ConfigurationProvider;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand Down Expand Up @@ -35,12 +37,12 @@ public class KafkaTrackingProducer {
private final KafkaProducer<String, String> _producer;

@Inject
public KafkaTrackingProducer(@Nonnull Config config, ApplicationLifecycle lifecycle) {
public KafkaTrackingProducer(@Nonnull Config config, ApplicationLifecycle lifecycle, final ConfigurationProvider configurationProvider) {
_isEnabled = !config.hasPath("analytics.enabled") || config.getBoolean("analytics.enabled");

if (_isEnabled) {
_logger.debug("Analytics tracking is enabled");
_producer = createKafkaProducer(config);
_producer = createKafkaProducer(config, configurationProvider.getKafka().getProducer());

lifecycle.addStopHook(
() -> {
Expand All @@ -62,13 +64,15 @@ public void send(ProducerRecord<String, String> record) {
_producer.send(record);
}

private static KafkaProducer createKafkaProducer(Config config) {
private static KafkaProducer createKafkaProducer(Config config, ProducerConfiguration producerConfiguration) {
final Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, "datahub-frontend");
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.getString("analytics.kafka.delivery.timeout.ms"));
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getString("analytics.kafka.bootstrap.server"));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // Actor urn.
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // JSON object.
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, producerConfiguration.getMaxRequestSize());
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, producerConfiguration.getCompressionType());

final String securityProtocolConfig = "analytics.kafka.security.protocol";
if (config.hasPath(securityProtocolConfig)
Expand Down
6 changes: 5 additions & 1 deletion datahub-frontend/app/config/ConfigurationProvider.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config;

import com.linkedin.metadata.config.cache.CacheConfiguration;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import lombok.Data;

Expand All @@ -11,14 +12,17 @@

/**
* Minimal sharing between metadata-service and frontend
* Initially for use of client caching configuration.
* Does not use the factories module to avoid transitive dependencies.
*/
@EnableConfigurationProperties
@PropertySource(value = "application.yml", factory = YamlPropertySourceFactory.class)
@ConfigurationProperties
@Data
public class ConfigurationProvider {
/**
* Kafka related configs.
*/
private KafkaConfiguration kafka;

/**
* Configuration for caching
Expand Down
26 changes: 13 additions & 13 deletions docs/deploy/environment-vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,19 @@ In general, there are **lots** of Kafka configuration environment variables for
These environment variables follow the standard Spring representation of properties as environment variables.
Simply replace the dot, `.`, with an underscore, `_`, and convert to uppercase.

| Variable | Default | Unit/Type | Components | Description |
|-----------------------------------------------------|----------------------------------------------|-----------|------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `KAFKA_LISTENER_CONCURRENCY` | 1 | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Number of Kafka consumer threads. Optimize throughput by matching to topic partitions. |
| `SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE` | 1048576 | bytes | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Max produced message size. Note that the topic configuration is not controlled by this variable. |
| `SCHEMA_REGISTRY_TYPE` | `INTERNAL` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry implementation. One of `INTERNAL` or `KAFKA` or `AWS_GLUE` |
| `KAFKA_SCHEMAREGISTRY_URL` | `http://localhost:8080/schema-registry/api/` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry url. Used for `INTERNAL` and `KAFKA`. The default value is for the `GMS` component. The `MCE Consumer` and `MAE Consumer` should be the `GMS` hostname and port. |
| `AWS_GLUE_SCHEMA_REGISTRY_REGION` | `us-east-1` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry implementation. |
| `AWS_GLUE_SCHEMA_REGISTRY_NAME` | `` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry. |
| `USE_CONFLUENT_SCHEMA_REGISTRY` | `true` | boolean | [`kafka-setup`] | Enable Confluent schema registry configuration. |
| `KAFKA_PRODUCER_MAX_REQUEST_SIZE` | `5242880` | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Max produced message size. Note that the topic configuration is not controlled by this variable. |
| `KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES` | `5242880` | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. |
| `MAX_MESSAGE_BYTES` | `5242880` | integer | [`kafka-setup`] | Sets the max message size on the kakfa topics. |
| `KAFKA_PRODUCER_COMPRESSION_TYPE` | `snappy` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | The compression used by the producer. |
| Variable | Default | Unit/Type | Components | Description |
|-----------------------------------------------------|----------------------------------------------|-----------|--------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `KAFKA_LISTENER_CONCURRENCY` | 1 | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Number of Kafka consumer threads. Optimize throughput by matching to topic partitions. |
| `SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE` | 1048576 | bytes | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Max produced message size. Note that the topic configuration is not controlled by this variable. |
| `SCHEMA_REGISTRY_TYPE` | `INTERNAL` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry implementation. One of `INTERNAL` or `KAFKA` or `AWS_GLUE` |
| `KAFKA_SCHEMAREGISTRY_URL` | `http://localhost:8080/schema-registry/api/` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry url. Used for `INTERNAL` and `KAFKA`. The default value is for the `GMS` component. The `MCE Consumer` and `MAE Consumer` should be the `GMS` hostname and port. |
| `AWS_GLUE_SCHEMA_REGISTRY_REGION` | `us-east-1` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry implementation. |
| `AWS_GLUE_SCHEMA_REGISTRY_NAME` | `` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry. |
| `USE_CONFLUENT_SCHEMA_REGISTRY` | `true` | boolean | [`kafka-setup`] | Enable Confluent schema registry configuration. |
| `KAFKA_PRODUCER_MAX_REQUEST_SIZE` | `5242880` | integer | [`Frontend`, `GMS`, `MCE Consumer`, `MAE Consumer`] | Max produced message size. Note that the topic configuration is not controlled by this variable. |
| `KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES` | `5242880` | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. |
| `MAX_MESSAGE_BYTES` | `5242880` | integer | [`kafka-setup`] | Sets the max message size on the kakfa topics. |
| `KAFKA_PRODUCER_COMPRESSION_TYPE` | `snappy` | string | [`Frontend`, `GMS`, `MCE Consumer`, `MAE Consumer`] | The compression used by the producer. |

## Frontend

Expand Down

0 comments on commit d8d5ecd

Please sign in to comment.