diff --git a/.github/workflows/docker-unified.yml b/.github/workflows/docker-unified.yml index 8666a5e2e2171..5f5a62de6288c 100644 --- a/.github/workflows/docker-unified.yml +++ b/.github/workflows/docker-unified.yml @@ -851,8 +851,14 @@ jobs: if: failure() run: | docker ps -a - docker logs datahub-gms >& gms-${{ matrix.test_strategy }}.log - docker logs datahub-actions >& actions-${{ matrix.test_strategy }}.log + docker logs datahub-gms >& gms-${{ matrix.test_strategy }}.log || true + docker logs datahub-actions >& actions-${{ matrix.test_strategy }}.log || true + docker logs datahub-mae-consumer >& mae-${{ matrix.test_strategy }}.log || true + docker logs datahub-mce-consumer >& mce-${{ matrix.test_strategy }}.log || true + docker logs broker >& broker-${{ matrix.test_strategy }}.log || true + docker logs mysql >& mysql-${{ matrix.test_strategy }}.log || true + docker logs elasticsearch >& elasticsearch-${{ matrix.test_strategy }}.log || true + docker logs datahub-frontend-react >& frontend-${{ matrix.test_strategy }}.log || true - name: Upload logs uses: actions/upload-artifact@v3 if: failure() diff --git a/build.gradle b/build.gradle index cf55a59cfe694..bd282535fa13c 100644 --- a/build.gradle +++ b/build.gradle @@ -39,7 +39,7 @@ buildscript { plugins { id 'com.gorylenko.gradle-git-properties' version '2.4.0-rc2' id 'com.github.johnrengelman.shadow' version '6.1.0' - id 'com.palantir.docker' version '0.35.0' + id 'com.palantir.docker' version '0.35.0' apply false // https://blog.ltgt.net/javax-jakarta-mess-and-gradle-solution/ // TODO id "org.gradlex.java-ecosystem-capabilities" version "1.0" } diff --git a/datahub-frontend/app/client/KafkaTrackingProducer.java b/datahub-frontend/app/client/KafkaTrackingProducer.java index fab17f9215d4a..59e91a6d5a0f7 100644 --- a/datahub-frontend/app/client/KafkaTrackingProducer.java +++ b/datahub-frontend/app/client/KafkaTrackingProducer.java @@ -1,6 +1,8 @@ package client; +import com.linkedin.metadata.config.kafka.ProducerConfiguration; import com.typesafe.config.Config; +import config.ConfigurationProvider; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -35,12 +37,12 @@ public class KafkaTrackingProducer { private final KafkaProducer _producer; @Inject - public KafkaTrackingProducer(@Nonnull Config config, ApplicationLifecycle lifecycle) { + public KafkaTrackingProducer(@Nonnull Config config, ApplicationLifecycle lifecycle, final ConfigurationProvider configurationProvider) { _isEnabled = !config.hasPath("analytics.enabled") || config.getBoolean("analytics.enabled"); if (_isEnabled) { _logger.debug("Analytics tracking is enabled"); - _producer = createKafkaProducer(config); + _producer = createKafkaProducer(config, configurationProvider.getKafka().getProducer()); lifecycle.addStopHook( () -> { @@ -62,13 +64,15 @@ public void send(ProducerRecord record) { _producer.send(record); } - private static KafkaProducer createKafkaProducer(Config config) { + private static KafkaProducer createKafkaProducer(Config config, ProducerConfiguration producerConfiguration) { final Properties props = new Properties(); props.put(ProducerConfig.CLIENT_ID_CONFIG, "datahub-frontend"); props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.getString("analytics.kafka.delivery.timeout.ms")); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getString("analytics.kafka.bootstrap.server")); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // Actor urn. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // JSON object. + props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, producerConfiguration.getMaxRequestSize()); + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, producerConfiguration.getCompressionType()); final String securityProtocolConfig = "analytics.kafka.security.protocol"; if (config.hasPath(securityProtocolConfig) diff --git a/datahub-frontend/app/config/ConfigurationProvider.java b/datahub-frontend/app/config/ConfigurationProvider.java index 00a5472ec3476..8f526c831b5c9 100644 --- a/datahub-frontend/app/config/ConfigurationProvider.java +++ b/datahub-frontend/app/config/ConfigurationProvider.java @@ -1,6 +1,7 @@ package config; import com.linkedin.metadata.config.cache.CacheConfiguration; +import com.linkedin.metadata.config.kafka.KafkaConfiguration; import com.linkedin.metadata.spring.YamlPropertySourceFactory; import lombok.Data; @@ -11,7 +12,6 @@ /** * Minimal sharing between metadata-service and frontend - * Initially for use of client caching configuration. * Does not use the factories module to avoid transitive dependencies. */ @EnableConfigurationProperties @@ -19,6 +19,10 @@ @ConfigurationProperties @Data public class ConfigurationProvider { + /** + * Kafka related configs. + */ + private KafkaConfiguration kafka; /** * Configuration for caching diff --git a/docker/broker/env/docker.env b/docker/broker/env/docker.env index 18115697c2832..6eb958609daf1 100644 --- a/docker/broker/env/docker.env +++ b/docker/broker/env/docker.env @@ -5,4 +5,6 @@ KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 KAFKA_HEAP_OPTS=-Xms256m -Xmx256m -KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false \ No newline at end of file +KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false +KAFKA_MESSAGE_MAX_BYTES=5242880 +KAFKA_MAX_MESSAGE_BYTES=5242880 \ No newline at end of file diff --git a/docker/datahub-frontend/Dockerfile b/docker/datahub-frontend/Dockerfile index 9efc0d2ce8753..9c13e73078042 100644 --- a/docker/datahub-frontend/Dockerfile +++ b/docker/datahub-frontend/Dockerfile @@ -8,10 +8,12 @@ RUN addgroup -S datahub && adduser -S datahub -G datahub # Upgrade Alpine and base packages # PFP-260: Upgrade Sqlite to >=3.28.0-r0 to fix https://security.snyk.io/vuln/SNYK-ALPINE39-SQLITE-449762 RUN apk --no-cache --update-cache --available upgrade \ - && apk --no-cache add curl sqlite \ + && apk --no-cache add curl sqlite libc6-compat java-snappy \ && apk --no-cache add openjdk11-jre --repository=http://dl-cdn.alpinelinux.org/alpine/edge/community \ && apk --no-cache add jattach --repository http://dl-cdn.alpinelinux.org/alpine/edge/community/ +ENV LD_LIBRARY_PATH="/lib:/lib64" + FROM base as prod-install COPY ./datahub-frontend.zip / diff --git a/docker/datahub-gms/Dockerfile b/docker/datahub-gms/Dockerfile index f5428f7480403..e271188a703cc 100644 --- a/docker/datahub-gms/Dockerfile +++ b/docker/datahub-gms/Dockerfile @@ -18,7 +18,7 @@ FROM alpine:3 AS base ENV JMX_VERSION=0.18.0 # PFP-260: Upgrade Sqlite to >=3.28.0-r0 to fix https://security.snyk.io/vuln/SNYK-ALPINE39-SQLITE-449762 RUN apk --no-cache --update-cache --available upgrade \ - && apk --no-cache add curl bash coreutils gcompat sqlite \ + && apk --no-cache add curl bash coreutils gcompat sqlite libc6-compat java-snappy \ && apk --no-cache add openjdk11-jre --repository=http://dl-cdn.alpinelinux.org/alpine/edge/community \ && apk --no-cache add jattach --repository http://dl-cdn.alpinelinux.org/alpine/edge/community/ \ && curl -sS https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-runner/9.4.46.v20220331/jetty-runner-9.4.46.v20220331.jar --output jetty-runner.jar \ @@ -29,6 +29,8 @@ RUN apk --no-cache --update-cache --available upgrade \ && cp /usr/lib/jvm/java-11-openjdk/jre/lib/security/cacerts /tmp/kafka.client.truststore.jks COPY --from=binary /go/bin/dockerize /usr/local/bin +ENV LD_LIBRARY_PATH="/lib:/lib64" + FROM base as prod-install COPY war.war /datahub/datahub-gms/bin/war.war COPY metadata-models/src/main/resources/entity-registry.yml /datahub/datahub-gms/resources/entity-registry.yml diff --git a/docker/datahub-mae-consumer/Dockerfile b/docker/datahub-mae-consumer/Dockerfile index 4b321b1639c1b..ec3da4de71d15 100644 --- a/docker/datahub-mae-consumer/Dockerfile +++ b/docker/datahub-mae-consumer/Dockerfile @@ -18,7 +18,7 @@ FROM alpine:3 AS base ENV JMX_VERSION=0.18.0 # PFP-260: Upgrade Sqlite to >=3.28.0-r0 to fix https://security.snyk.io/vuln/SNYK-ALPINE39-SQLITE-449762 RUN apk --no-cache --update-cache --available upgrade \ - && apk --no-cache add curl bash coreutils sqlite \ + && apk --no-cache add curl bash coreutils sqlite libc6-compat java-snappy \ && apk --no-cache add openjdk11-jre --repository=http://dl-cdn.alpinelinux.org/alpine/edge/community \ && apk --no-cache add jattach --repository http://dl-cdn.alpinelinux.org/alpine/edge/community/ \ && wget --no-verbose https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v1.24.0/opentelemetry-javaagent.jar \ @@ -26,6 +26,8 @@ RUN apk --no-cache --update-cache --available upgrade \ && cp /usr/lib/jvm/java-11-openjdk/jre/lib/security/cacerts /tmp/kafka.client.truststore.jks COPY --from=binary /go/bin/dockerize /usr/local/bin +ENV LD_LIBRARY_PATH="/lib:/lib64" + FROM base as prod-install COPY mae-consumer-job.jar /datahub/datahub-mae-consumer/bin/ COPY metadata-models/src/main/resources/entity-registry.yml /datahub/datahub-mae-consumer/resources/entity-registry.yml diff --git a/docker/datahub-mce-consumer/Dockerfile b/docker/datahub-mce-consumer/Dockerfile index 4d38ee6daa235..f9c47f77a98f5 100644 --- a/docker/datahub-mce-consumer/Dockerfile +++ b/docker/datahub-mce-consumer/Dockerfile @@ -18,7 +18,7 @@ FROM alpine:3 AS base ENV JMX_VERSION=0.18.0 # PFP-260: Upgrade Sqlite to >=3.28.0-r0 to fix https://security.snyk.io/vuln/SNYK-ALPINE39-SQLITE-449762 RUN apk --no-cache --update-cache --available upgrade \ - && apk --no-cache add curl bash sqlite \ + && apk --no-cache add curl bash sqlite libc6-compat java-snappy \ && apk --no-cache add openjdk11-jre --repository=http://dl-cdn.alpinelinux.org/alpine/edge/community \ && apk --no-cache add jattach --repository http://dl-cdn.alpinelinux.org/alpine/edge/community/ \ && wget --no-verbose https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v1.24.0/opentelemetry-javaagent.jar \ @@ -33,6 +33,8 @@ COPY docker/datahub-mce-consumer/start.sh /datahub/datahub-mce-consumer/scripts/ COPY docker/monitoring/client-prometheus-config.yaml /datahub/datahub-mce-consumer/scripts/prometheus-config.yaml RUN chmod +x /datahub/datahub-mce-consumer/scripts/start.sh +ENV LD_LIBRARY_PATH="/lib:/lib64" + FROM base as dev-install # Dummy stage for development. Assumes code is built on your machine and mounted to this image. # See this excellent thread https://github.com/docker/cli/issues/1134 diff --git a/docker/datahub-upgrade/Dockerfile b/docker/datahub-upgrade/Dockerfile index 945be54678a24..f08e7268e4018 100644 --- a/docker/datahub-upgrade/Dockerfile +++ b/docker/datahub-upgrade/Dockerfile @@ -18,7 +18,7 @@ FROM alpine:3 AS base ENV JMX_VERSION=0.18.0 # PFP-260: Upgrade Sqlite to >=3.28.0-r0 to fix https://security.snyk.io/vuln/SNYK-ALPINE39-SQLITE-449762 RUN apk --no-cache --update-cache --available upgrade \ - && apk --no-cache add curl bash coreutils gcompat sqlite \ + && apk --no-cache add curl bash coreutils gcompat sqlite libc6-compat java-snappy \ && apk --no-cache add openjdk11-jre --repository=http://dl-cdn.alpinelinux.org/alpine/edge/community \ && curl -sS https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-runner/9.4.46.v20220331/jetty-runner-9.4.46.v20220331.jar --output jetty-runner.jar \ && curl -sS https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-jmx/9.4.46.v20220331/jetty-jmx-9.4.46.v20220331.jar --output jetty-jmx.jar \ @@ -28,6 +28,8 @@ RUN apk --no-cache --update-cache --available upgrade \ && cp /usr/lib/jvm/java-11-openjdk/jre/lib/security/cacerts /tmp/kafka.client.truststore.jks COPY --from=binary /go/bin/dockerize /usr/local/bin +ENV LD_LIBRARY_PATH="/lib:/lib64" + FROM base as prod-install COPY datahub-upgrade.jar /datahub/datahub-upgrade/bin/ COPY metadata-models/src/main/resources/entity-registry.yml /datahub/datahub-gms/resources/entity-registry.yml diff --git a/docker/kafka-setup/kafka-config.sh b/docker/kafka-setup/kafka-config.sh index 2ba8e2d7c5d47..4d5698ccc3856 100644 --- a/docker/kafka-setup/kafka-config.sh +++ b/docker/kafka-setup/kafka-config.sh @@ -2,6 +2,7 @@ : ${PARTITIONS:=1} : ${REPLICATION_FACTOR:=1} +: ${MAX_MESSAGE_BYTES:=5242880} : ${KAFKA_PROPERTIES_SECURITY_PROTOCOL:=PLAINTEXT} @@ -12,3 +13,4 @@ export KAFKA_HEAP_OPTS="-Xmx64M" CONNECTION_PROPERTIES_PATH=/tmp/connection.properties WORKERS=4 +DELIMITER=";" diff --git a/docker/kafka-setup/kafka-setup.sh b/docker/kafka-setup/kafka-setup.sh index b5024e49e59f1..439ffb4d4d829 100755 --- a/docker/kafka-setup/kafka-setup.sh +++ b/docker/kafka-setup/kafka-setup.sh @@ -102,24 +102,43 @@ exec 4<&- send() { work_id=$1 topic_args=$2 - echo sending $work_id $topic_args - echo "$work_id" "$topic_args" 1>&3 ## the fifo is fd 3 + topic_config=$3 + + echo -e "sending $work_id\n worker_args: ${topic_args}${DELIMITER}${topic_config}" + echo "$work_id" "${topic_args}${DELIMITER}${topic_config}" 1>&3 ## the fifo is fd 3 } ## Produce the jobs to run. -send "$METADATA_AUDIT_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_AUDIT_EVENT_NAME" -send "$METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_EVENT_NAME" -send "$FAILED_METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_EVENT_NAME" -send "$METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" +send "$METADATA_AUDIT_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_AUDIT_EVENT_NAME" \ + "--entity-type topics --entity-name $METADATA_AUDIT_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES" + +send "$METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_EVENT_NAME" \ + "--entity-type topics --entity-name $METADATA_CHANGE_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES" +send "$FAILED_METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_EVENT_NAME" \ + "--entity-type topics --entity-name $FAILED_METADATA_CHANGE_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES" + +send "$METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" \ + "--entity-type topics --entity-name $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES" # Set retention to 90 days -send "$METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" "--partitions $PARTITIONS --config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" -send "$METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_PROPOSAL_TOPIC_NAME" -send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" -send "$PLATFORM_EVENT_TOPIC_NAME" "--partitions $PARTITIONS --topic $PLATFORM_EVENT_TOPIC_NAME" +send "$METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" "--partitions $PARTITIONS --config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" \ + "--entity-type topics --entity-name $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES" + +send "$METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_PROPOSAL_TOPIC_NAME" \ + "--entity-type topics --entity-name $METADATA_CHANGE_PROPOSAL_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES" +send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" \ + "--entity-type topics --entity-name $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES" + +send "$PLATFORM_EVENT_TOPIC_NAME" "--partitions $PARTITIONS --topic $PLATFORM_EVENT_TOPIC_NAME" \ + "--entity-type topics --entity-name $PLATFORM_EVENT_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES" # Infinite retention upgrade topic -send "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" "--partitions 1 --config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" + # Make sure the retention.ms config for $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME is configured to infinite + # Please see the bug report below for details + # https://github.com/datahub-project/datahub/issues/7882 +send "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" "--partitions 1 --config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" \ + "--entity-type topics --entity-name "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" --alter --add-config retention.ms=-1" + # Create topic for datahub usage event if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then send "$DATAHUB_USAGE_EVENT_NAME" "--partitions $PARTITIONS --topic $DATAHUB_USAGE_EVENT_NAME" @@ -150,8 +169,3 @@ if [[ $USE_CONFLUENT_SCHEMA_REGISTRY == "TRUE" ]]; then --entity-name _schemas \ --alter --add-config cleanup.policy=compact fi - -# Make sure the retention.ms config for $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME is configured to infinite -# Please see the bug report below for details -# https://github.com/datahub-project/datahub/issues/7882 -kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --entity-type topics --entity-name "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" --alter --add-config retention.ms=-1 diff --git a/docker/kafka-setup/kafka-topic-workers.sh b/docker/kafka-setup/kafka-topic-workers.sh index fd0d45c3f4611..3ddf41abbabf5 100644 --- a/docker/kafka-setup/kafka-topic-workers.sh +++ b/docker/kafka-setup/kafka-topic-workers.sh @@ -11,10 +11,18 @@ START_LOCK=$4 ## the queue workers are supposed to be doing job() { i=$1 - topic_args=$2 + worker_args=$2 + topic_args=$(echo $worker_args | cut -d "$DELIMITER" -f 1) + topic_config=$(echo $worker_args | cut -d "$DELIMITER" -f 2) + + echo " $i: kafka-topics.sh --create --if-not-exist $topic_args" kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER \ --replication-factor $REPLICATION_FACTOR \ $topic_args + if [[ ! -z "$topic_config" ]]; then + echo " $i: kafka-configs.sh $topic_config" + kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER $topic_config + fi } ## This is the worker to read from the queue. diff --git a/docker/quickstart/docker-compose-m1.quickstart.yml b/docker/quickstart/docker-compose-m1.quickstart.yml index 89e9aaa0defd6..c5de687d335b9 100644 --- a/docker/quickstart/docker-compose-m1.quickstart.yml +++ b/docker/quickstart/docker-compose-m1.quickstart.yml @@ -16,6 +16,8 @@ services: - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 - KAFKA_HEAP_OPTS=-Xms256m -Xmx256m - KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false + - KAFKA_MESSAGE_MAX_BYTES=5242880 + - KAFKA_MAX_MESSAGE_BYTES=5242880 healthcheck: interval: 1s retries: 5 diff --git a/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml b/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml index f6284edc83648..b6935f24c5ce2 100644 --- a/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml +++ b/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml @@ -16,6 +16,8 @@ services: - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 - KAFKA_HEAP_OPTS=-Xms256m -Xmx256m - KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false + - KAFKA_MESSAGE_MAX_BYTES=5242880 + - KAFKA_MAX_MESSAGE_BYTES=5242880 healthcheck: interval: 1s retries: 5 diff --git a/docker/quickstart/docker-compose-without-neo4j.quickstart.yml b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml index 4e3503e35c0db..4ff8bbd70da85 100644 --- a/docker/quickstart/docker-compose-without-neo4j.quickstart.yml +++ b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml @@ -16,6 +16,8 @@ services: - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 - KAFKA_HEAP_OPTS=-Xms256m -Xmx256m - KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false + - KAFKA_MESSAGE_MAX_BYTES=5242880 + - KAFKA_MAX_MESSAGE_BYTES=5242880 healthcheck: interval: 1s retries: 5 diff --git a/docker/quickstart/docker-compose.quickstart.yml b/docker/quickstart/docker-compose.quickstart.yml index e2f52064389e0..f2950ebab2c9d 100644 --- a/docker/quickstart/docker-compose.quickstart.yml +++ b/docker/quickstart/docker-compose.quickstart.yml @@ -16,6 +16,8 @@ services: - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 - KAFKA_HEAP_OPTS=-Xms256m -Xmx256m - KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false + - KAFKA_MESSAGE_MAX_BYTES=5242880 + - KAFKA_MAX_MESSAGE_BYTES=5242880 healthcheck: interval: 1s retries: 5 diff --git a/docs/deploy/environment-vars.md b/docs/deploy/environment-vars.md index 779c3d3d7c432..4c7b249349ca0 100644 --- a/docs/deploy/environment-vars.md +++ b/docs/deploy/environment-vars.md @@ -67,15 +67,19 @@ In general, there are **lots** of Kafka configuration environment variables for These environment variables follow the standard Spring representation of properties as environment variables. Simply replace the dot, `.`, with an underscore, `_`, and convert to uppercase. -| Variable | Default | Unit/Type | Components | Description | -|-----------------------------------------------------|----------------------------------------------|-----------|-----------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `KAFKA_LISTENER_CONCURRENCY` | 1 | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Number of Kafka consumer threads. Optimize throughput by matching to topic partitions. | -| `SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE` | 1048576 | bytes | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Max produced message size. Note that the topic configuration is not controlled by this variable. | -| `SCHEMA_REGISTRY_TYPE` | `INTERNAL` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry implementation. One of `INTERNAL` or `KAFKA` or `AWS_GLUE` | -| `KAFKA_SCHEMAREGISTRY_URL` | `http://localhost:8080/schema-registry/api/` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry url. Used for `INTERNAL` and `KAFKA`. The default value is for the `GMS` component. The `MCE Consumer` and `MAE Consumer` should be the `GMS` hostname and port. | -| `AWS_GLUE_SCHEMA_REGISTRY_REGION` | `us-east-1` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry implementation. | -| `AWS_GLUE_SCHEMA_REGISTRY_NAME` | `` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry. | -| `USE_CONFLUENT_SCHEMA_REGISTRY` | `true` | boolean | [`kafka-setup`] | Enable Confluent schema registry configuration. | +| Variable | Default | Unit/Type | Components | Description | +|-----------------------------------------------------|----------------------------------------------|-----------|--------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `KAFKA_LISTENER_CONCURRENCY` | 1 | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Number of Kafka consumer threads. Optimize throughput by matching to topic partitions. | +| `SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE` | 1048576 | bytes | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Max produced message size. Note that the topic configuration is not controlled by this variable. | +| `SCHEMA_REGISTRY_TYPE` | `INTERNAL` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry implementation. One of `INTERNAL` or `KAFKA` or `AWS_GLUE` | +| `KAFKA_SCHEMAREGISTRY_URL` | `http://localhost:8080/schema-registry/api/` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry url. Used for `INTERNAL` and `KAFKA`. The default value is for the `GMS` component. The `MCE Consumer` and `MAE Consumer` should be the `GMS` hostname and port. | +| `AWS_GLUE_SCHEMA_REGISTRY_REGION` | `us-east-1` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry implementation. | +| `AWS_GLUE_SCHEMA_REGISTRY_NAME` | `` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry. | +| `USE_CONFLUENT_SCHEMA_REGISTRY` | `true` | boolean | [`kafka-setup`] | Enable Confluent schema registry configuration. | +| `KAFKA_PRODUCER_MAX_REQUEST_SIZE` | `5242880` | integer | [`Frontend`, `GMS`, `MCE Consumer`, `MAE Consumer`] | Max produced message size. Note that the topic configuration is not controlled by this variable. | +| `KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES` | `5242880` | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. | +| `MAX_MESSAGE_BYTES` | `5242880` | integer | [`kafka-setup`] | Sets the max message size on the kakfa topics. | +| `KAFKA_PRODUCER_COMPRESSION_TYPE` | `snappy` | string | [`Frontend`, `GMS`, `MCE Consumer`, `MAE Consumer`] | The compression used by the producer. | ## Frontend diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ConsumerConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ConsumerConfiguration.java new file mode 100644 index 0000000000000..7a93119226a2d --- /dev/null +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ConsumerConfiguration.java @@ -0,0 +1,10 @@ +package com.linkedin.metadata.config.kafka; + +import lombok.Data; + + +@Data +public class ConsumerConfiguration { + + private int maxPartitionFetchBytes; +} diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java index 2966abfc63396..2345f88352c17 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java @@ -12,4 +12,6 @@ public class KafkaConfiguration { private SchemaRegistryConfiguration schemaRegistry; private ProducerConfiguration producer; + + private ConsumerConfiguration consumer; } diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ProducerConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ProducerConfiguration.java index 2bf4cea3f0c18..26a8c6b649133 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ProducerConfiguration.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ProducerConfiguration.java @@ -13,4 +13,8 @@ public class ProducerConfiguration { private int requestTimeout; private int backoffTimeout; + + private String compressionType; + + private int maxRequestSize; } diff --git a/metadata-service/configuration/src/main/resources/application.yml b/metadata-service/configuration/src/main/resources/application.yml index 5d72e24748072..b817208672e08 100644 --- a/metadata-service/configuration/src/main/resources/application.yml +++ b/metadata-service/configuration/src/main/resources/application.yml @@ -228,6 +228,10 @@ kafka: deliveryTimeout: ${KAFKA_PRODUCER_DELIVERY_TIMEOUT:30000} requestTimeout: ${KAFKA_PRODUCER_REQUEST_TIMEOUT:3000} backoffTimeout: ${KAFKA_PRODUCER_BACKOFF_TIMEOUT:500} + compressionType: ${KAFKA_PRODUCER_COMPRESSION_TYPE:snappy} # producer's compression algorithm + maxRequestSize: ${KAFKA_PRODUCER_MAX_REQUEST_SIZE:5242880} # the max bytes sent by the producer, also see kafka-setup MAX_MESSAGE_BYTES for matching value + consumer: + maxPartitionFetchBytes: ${KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES:5242880} # the max bytes consumed per partition schemaRegistry: type: ${SCHEMA_REGISTRY_TYPE:KAFKA} # INTERNAL or KAFKA or AWS_GLUE url: ${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java index c67a2e704681f..78b3de501e0e5 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java @@ -59,6 +59,8 @@ public static Map buildProducerProperties(SchemaRegistryConfig s props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaConfiguration.getProducer().getDeliveryTimeout()); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaConfiguration.getProducer().getRequestTimeout()); props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaConfiguration.getProducer().getBackoffTimeout()); + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, kafkaConfiguration.getProducer().getCompressionType()); + props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaConfiguration.getProducer().getMaxRequestSize()); // Override KafkaProperties with SchemaRegistryConfig only for non-empty values schemaRegistryConfig.getProperties().entrySet() diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java index ba18be6834d14..7a9e80781d639 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java @@ -70,6 +70,7 @@ private static Map buildCustomizedProperties(KafkaProperties bas consumerProps.setEnableAutoCommit(true); consumerProps.setAutoCommitInterval(Duration.ofSeconds(10)); + // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS if (kafkaConfiguration.getBootstrapServers() != null && kafkaConfiguration.getBootstrapServers().length() > 0) { consumerProps.setBootstrapServers(Arrays.asList(kafkaConfiguration.getBootstrapServers().split(","))); @@ -84,6 +85,9 @@ private static Map buildCustomizedProperties(KafkaProperties bas .filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty()) .forEach(entry -> customizedProperties.put(entry.getKey(), entry.getValue())); + customizedProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, + kafkaConfiguration.getConsumer().getMaxPartitionFetchBytes()); + return customizedProperties; } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java index 05ebfdddf8b80..e12cbec87fe45 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java @@ -4,8 +4,11 @@ import com.linkedin.gms.factory.config.ConfigurationProvider; import java.time.Duration; import java.util.Arrays; +import java.util.Map; + import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; @@ -40,10 +43,14 @@ protected KafkaListenerContainerFactory createInstance(@Qualifier("configurat consumerProps.setBootstrapServers(Arrays.asList(kafkaConfiguration.getBootstrapServers().split(","))); } // else we rely on KafkaProperties which defaults to localhost:9092 + Map customizedProperties = consumerProps.buildProperties(); + customizedProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, + kafkaConfiguration.getConsumer().getMaxPartitionFetchBytes()); + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setContainerCustomizer(new ThreadPoolContainerCustomizer()); - factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties())); + factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(customizedProperties)); log.info("Simple KafkaListenerContainerFactory built successfully");