From 1b34c024afd7aeda9475e6d2a6daea253f29efd3 Mon Sep 17 00:00:00 2001 From: Gwenneg Lepage Date: Mon, 23 Sep 2024 10:04:46 +0200 Subject: [PATCH] RHCLOUD-35363 Process Kafka messages asynchronously --- .rhcicd/clowdapp-engine.yaml | 20 ++++++++ .../notifications/config/EngineConfig.java | 51 ++++++++++++++++++- .../notifications/events/EventConsumer.java | 46 ++++++++++++++++- 3 files changed, 113 insertions(+), 4 deletions(-) diff --git a/.rhcicd/clowdapp-engine.yaml b/.rhcicd/clowdapp-engine.yaml index 54beb7b320..70cf6989a3 100644 --- a/.rhcicd/clowdapp-engine.yaml +++ b/.rhcicd/clowdapp-engine.yaml @@ -114,6 +114,14 @@ objects: value: ${NOTIFICATIONS_EMAIL_SENDER_OPENSHIFT_STAGE} - name: NOTIFICATIONS_EMAIL_SENDER_OPENSHIFT_PROD value: ${NOTIFICATIONS_EMAIL_SENDER_OPENSHIFT_PROD} + - name: NOTIFICATIONS_EVENT_CONSUMER_CORE_THREAD_POOL_SIZE + value: ${NOTIFICATIONS_EVENT_CONSUMER_CORE_THREAD_POOL_SIZE} + - name: NOTIFICATIONS_EVENT_CONSUMER_MAX_THREAD_POOL_SIZE + value: ${NOTIFICATIONS_EVENT_CONSUMER_MAX_THREAD_POOL_SIZE} + - name: NOTIFICATIONS_EVENT_CONSUMER_KEEP_ALIVE_TIME_SECONDS + value: ${NOTIFICATIONS_EVENT_CONSUMER_KEEP_ALIVE_TIME_SECONDS} + - name: NOTIFICATIONS_EVENT_CONSUMER_QUEUE_CAPACITY + value: ${NOTIFICATIONS_EVENT_CONSUMER_QUEUE_CAPACITY} - name: NOTIFICATIONS_KAFKA_CONSUMED_TOTAL_CHECKER_ENABLED value: ${KAFKA_CONSUMED_TOTAL_CHECKER_ENABLED} - name: NOTIFICATIONS_KAFKA_CONSUMED_TOTAL_CHECKER_INITIAL_DELAY @@ -257,6 +265,18 @@ parameters: - name: NOTIFICATIONS_EMAIL_SENDER_OPENSHIFT_PROD description: The email sender address for the OpenShift domain in production. value: "\"Red Hat OpenShift\" noreply@redhat.com" +- name: NOTIFICATIONS_EVENT_CONSUMER_CORE_THREAD_POOL_SIZE + description: Number of threads to keep in the pool, even if they are idle. + value: "10" +- name: NOTIFICATIONS_EVENT_CONSUMER_MAX_THREAD_POOL_SIZE + description: Maximum number of threads to allow in the pool. + value: "10" +- name: NOTIFICATIONS_EVENT_CONSUMER_KEEP_ALIVE_TIME_SECONDS + description: Maximum time that excess idle threads will wait for new tasks before terminating. + value: "60" +- name: NOTIFICATIONS_EVENT_CONSUMER_QUEUE_CAPACITY + description: Capacity of the blocking Kafka messages queue. + value: "1" - name: NOTIFICATIONS_KAFKA_OUTGOING_HIGH_VOLUME_TOPIC_ENABLED description: Specifies whether the high volume topic is enabled in the engine or not. value: "false" diff --git a/engine/src/main/java/com/redhat/cloud/notifications/config/EngineConfig.java b/engine/src/main/java/com/redhat/cloud/notifications/config/EngineConfig.java index a09556d544..71f92aada7 100644 --- a/engine/src/main/java/com/redhat/cloud/notifications/config/EngineConfig.java +++ b/engine/src/main/java/com/redhat/cloud/notifications/config/EngineConfig.java @@ -22,12 +22,16 @@ public class EngineConfig { */ private static final String DEFAULT_TEMPLATE = "notifications.use-default-template"; private static final String EMAILS_ONLY_MODE = "notifications.emails-only-mode.enabled"; + private static final String EVENT_CONSUMER_CORE_THREAD_POOL_SIZE = "notifications.event-consumer.core-thread-pool-size"; + private static final String EVENT_CONSUMER_MAX_THREAD_POOL_SIZE = "notifications.event-consumer.max-thread-pool-size"; + private static final String EVENT_CONSUMER_KEEP_ALIVE_TIME_SECONDS = "notifications.event-consumer.keep-alive-time-seconds"; + private static final String EVENT_CONSUMER_QUEUE_CAPACITY = "notifications.event-consumer.queue-capacity"; private static final String SECURED_EMAIL_TEMPLATES = "notifications.use-secured-email-templates.enabled"; private static final String NOTIFICATIONS_KAFKA_OUTGOING_HIGH_VOLUME_TOPIC_ENABLED = "notifications.kafka.outgoing.high-volume.topic.enabled"; private static final String KAFKA_TOCAMEL_MAXIMUM_REQUEST_SIZE = "mp.messaging.outgoing.tocamel.max.request.size"; private static final String UNLEASH = "notifications.unleash.enabled"; - public static final String PROCESSOR_CONNECTORS_MAX_SERVER_ERRORS = "processor.connectors.max-server-errors"; - public static final String PROCESSOR_CONNECTORS_MIN_DELAY_SINCE_FIRST_SERVER_ERROR = "processor.connectors.min-delay-since-first-server-error"; + private static final String PROCESSOR_CONNECTORS_MAX_SERVER_ERRORS = "processor.connectors.max-server-errors"; + private static final String PROCESSOR_CONNECTORS_MIN_DELAY_SINCE_FIRST_SERVER_ERROR = "processor.connectors.min-delay-since-first-server-error"; /** * Standard "Red Hat Hybrid Cloud Console" sender that the vast majority of the @@ -44,6 +48,7 @@ public class EngineConfig { * Unleash configuration */ private String asyncAggregationToggle; + private String asyncEventProcessingToggle; private String drawerToggle; private String kafkaConsumedTotalCheckerToggle; private String toggleKafkaOutgoingHighVolumeTopic; @@ -88,6 +93,18 @@ public class EngineConfig { @ConfigProperty(name = EMAILS_ONLY_MODE, defaultValue = "false") boolean emailsOnlyModeEnabled; + @ConfigProperty(name = EVENT_CONSUMER_CORE_THREAD_POOL_SIZE, defaultValue = "10") + int eventConsumerCoreThreadPoolSize; + + @ConfigProperty(name = EVENT_CONSUMER_MAX_THREAD_POOL_SIZE, defaultValue = "10") + int eventConsumerMaxThreadPoolSize; + + @ConfigProperty(name = EVENT_CONSUMER_KEEP_ALIVE_TIME_SECONDS, defaultValue = "60") + long eventConsumerKeepAliveTimeSeconds; + + @ConfigProperty(name = EVENT_CONSUMER_QUEUE_CAPACITY, defaultValue = "1") + int eventConsumerQueueCapacity; + // Only used in special environments. @ConfigProperty(name = SECURED_EMAIL_TEMPLATES, defaultValue = "false") boolean useSecuredEmailTemplates; @@ -128,6 +145,7 @@ public class EngineConfig { @PostConstruct void postConstruct() { asyncAggregationToggle = toggleRegistry.register("async-aggregation", true); + asyncEventProcessingToggle = toggleRegistry.register("async-event-processing", true); drawerToggle = toggleRegistry.register("drawer", true); kafkaConsumedTotalCheckerToggle = toggleRegistry.register("kafka-consumed-total-checker", true); toggleKafkaOutgoingHighVolumeTopic = toggleRegistry.register("kafka-outgoing-high-volume-topic", true); @@ -140,6 +158,10 @@ void logConfigAtStartup(@Observes Startup event) { config.put(DEFAULT_TEMPLATE, isDefaultTemplateEnabled()); config.put(drawerToggle, isDrawerEnabled()); config.put(EMAILS_ONLY_MODE, isEmailsOnlyModeEnabled()); + config.put(EVENT_CONSUMER_CORE_THREAD_POOL_SIZE, eventConsumerCoreThreadPoolSize); + config.put(EVENT_CONSUMER_MAX_THREAD_POOL_SIZE, eventConsumerMaxThreadPoolSize); + config.put(EVENT_CONSUMER_KEEP_ALIVE_TIME_SECONDS, eventConsumerKeepAliveTimeSeconds); + config.put(EVENT_CONSUMER_QUEUE_CAPACITY, eventConsumerQueueCapacity); config.put(kafkaConsumedTotalCheckerToggle, isKafkaConsumedTotalCheckerEnabled()); config.put(KAFKA_TOCAMEL_MAXIMUM_REQUEST_SIZE, getKafkaToCamelMaximumRequestSize()); config.put(SECURED_EMAIL_TEMPLATES, isSecuredEmailTemplatesEnabled()); @@ -150,6 +172,7 @@ void logConfigAtStartup(@Observes Startup event) { config.put(NOTIFICATIONS_EMAIL_SENDER_OPENSHIFT_STAGE, rhOpenshiftSenderStage); config.put(NOTIFICATIONS_EMAIL_SENDER_OPENSHIFT_PROD, rhOpenshiftSenderProd); config.put(toggleKafkaOutgoingHighVolumeTopic, isOutgoingKafkaHighVolumeTopicEnabled()); + config.put(asyncEventProcessingToggle, isAsyncEventProcessing()); Log.info("=== Startup configuration ==="); config.forEach((key, value) -> { @@ -165,6 +188,14 @@ public boolean isAsyncAggregationEnabled() { } } + public boolean isAsyncEventProcessing() { + if (unleashEnabled) { + return unleash.isEnabled(asyncEventProcessingToggle, false); + } else { + return false; + } + } + public boolean isDefaultTemplateEnabled() { return defaultTemplateEnabled; } @@ -181,6 +212,22 @@ public boolean isEmailsOnlyModeEnabled() { return emailsOnlyModeEnabled; } + public int getEventConsumerCoreThreadPoolSize() { + return eventConsumerCoreThreadPoolSize; + } + + public int getEventConsumerMaxThreadPoolSize() { + return eventConsumerMaxThreadPoolSize; + } + + public long getEventConsumerKeepAliveTimeSeconds() { + return eventConsumerKeepAliveTimeSeconds; + } + + public int getEventConsumerQueueCapacity() { + return eventConsumerQueueCapacity; + } + public boolean isKafkaConsumedTotalCheckerEnabled() { if (unleashEnabled) { return unleash.isEnabled(kafkaConsumedTotalCheckerToggle, false); diff --git a/engine/src/main/java/com/redhat/cloud/notifications/events/EventConsumer.java b/engine/src/main/java/com/redhat/cloud/notifications/events/EventConsumer.java index 54061d84ba..7aa8f7461b 100644 --- a/engine/src/main/java/com/redhat/cloud/notifications/events/EventConsumer.java +++ b/engine/src/main/java/com/redhat/cloud/notifications/events/EventConsumer.java @@ -4,6 +4,7 @@ import com.redhat.cloud.event.parser.exceptions.ConsoleCloudEventParsingException; import com.redhat.cloud.notifications.cloudevent.transformers.CloudEventTransformer; import com.redhat.cloud.notifications.cloudevent.transformers.CloudEventTransformerFactory; +import com.redhat.cloud.notifications.config.EngineConfig; import com.redhat.cloud.notifications.db.repositories.EventRepository; import com.redhat.cloud.notifications.db.repositories.EventTypeRepository; import com.redhat.cloud.notifications.ingress.Action; @@ -30,9 +31,14 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import static com.redhat.cloud.notifications.events.KafkaMessageDeduplicator.MESSAGE_ID_HEADER; +import static java.util.concurrent.TimeUnit.SECONDS; @ApplicationScoped public class EventConsumer { @@ -75,12 +81,16 @@ public class EventConsumer { @Inject KafkaHeadersExtractor kafkaHeadersExtractor; + @Inject + EngineConfig config; + ConsoleCloudEventParser cloudEventParser = new ConsoleCloudEventParser(); private Counter rejectedCounter; private Counter processingErrorCounter; private Counter duplicateCounter; private Counter processingExceptionCounter; + private ExecutorService executor; @PostConstruct public void init() { @@ -88,12 +98,45 @@ public void init() { processingErrorCounter = registry.counter(PROCESSING_ERROR_COUNTER_NAME); processingExceptionCounter = registry.counter(PROCESSING_EXCEPTION_COUNTER_NAME); duplicateCounter = registry.counter(DUPLICATE_COUNTER_NAME); + + executor = new ThreadPoolExecutor( + config.getEventConsumerCoreThreadPoolSize(), + config.getEventConsumerMaxThreadPoolSize(), + config.getEventConsumerKeepAliveTimeSeconds(), + SECONDS, + buildBlockingQueue() + ); + } + + private BlockingQueue buildBlockingQueue() { + return new LinkedBlockingQueue<>(config.getEventConsumerQueueCapacity()) { + @Override + public boolean offer(Runnable runnable) { + try { + // Block until space is available + put(runnable); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + }; } @Incoming(INGRESS_CHANNEL) @Blocking + public CompletionStage consume(Message message) { + if (config.isAsyncEventProcessing()) { + executor.submit(() -> process(message)); + } else { + process(message); + } + return message.ack(); + } + @ActivateRequestContext - public CompletionStage process(Message message) { + public void process(Message message) { // This timer will have dynamic tag values based on the action parsed from the received message. Timer.Sample consumedTimer = Timer.start(registry); String payload = message.getPayload(); @@ -220,7 +263,6 @@ public CompletionStage process(Message message) { TAG_KEY_EVENT_TYPE_FQN, tags.getOrDefault(TAG_KEY_EVENT_TYPE_FQN, "") )); } - return message.ack(); } private EventWrapper parsePayload(String payload, Map tags) {