Skip to content

Commit

Permalink
RHCLOUD-35363 Process Kafka messages asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
gwenneg committed Sep 23, 2024
1 parent 19f4f33 commit 1b34c02
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 4 deletions.
20 changes: 20 additions & 0 deletions .rhcicd/clowdapp-engine.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ objects:
value: ${NOTIFICATIONS_EMAIL_SENDER_OPENSHIFT_STAGE}
- name: NOTIFICATIONS_EMAIL_SENDER_OPENSHIFT_PROD
value: ${NOTIFICATIONS_EMAIL_SENDER_OPENSHIFT_PROD}
- name: NOTIFICATIONS_EVENT_CONSUMER_CORE_THREAD_POOL_SIZE
value: ${NOTIFICATIONS_EVENT_CONSUMER_CORE_THREAD_POOL_SIZE}
- name: NOTIFICATIONS_EVENT_CONSUMER_MAX_THREAD_POOL_SIZE
value: ${NOTIFICATIONS_EVENT_CONSUMER_MAX_THREAD_POOL_SIZE}
- name: NOTIFICATIONS_EVENT_CONSUMER_KEEP_ALIVE_TIME_SECONDS
value: ${NOTIFICATIONS_EVENT_CONSUMER_KEEP_ALIVE_TIME_SECONDS}
- name: NOTIFICATIONS_EVENT_CONSUMER_QUEUE_CAPACITY
value: ${NOTIFICATIONS_EVENT_CONSUMER_QUEUE_CAPACITY}
- name: NOTIFICATIONS_KAFKA_CONSUMED_TOTAL_CHECKER_ENABLED
value: ${KAFKA_CONSUMED_TOTAL_CHECKER_ENABLED}
- name: NOTIFICATIONS_KAFKA_CONSUMED_TOTAL_CHECKER_INITIAL_DELAY
Expand Down Expand Up @@ -257,6 +265,18 @@ parameters:
- name: NOTIFICATIONS_EMAIL_SENDER_OPENSHIFT_PROD
description: The email sender address for the OpenShift domain in production.
value: "\"Red Hat OpenShift\" [email protected]"
- name: NOTIFICATIONS_EVENT_CONSUMER_CORE_THREAD_POOL_SIZE
description: Number of threads to keep in the pool, even if they are idle.
value: "10"
- name: NOTIFICATIONS_EVENT_CONSUMER_MAX_THREAD_POOL_SIZE
description: Maximum number of threads to allow in the pool.
value: "10"
- name: NOTIFICATIONS_EVENT_CONSUMER_KEEP_ALIVE_TIME_SECONDS
description: Maximum time that excess idle threads will wait for new tasks before terminating.
value: "60"
- name: NOTIFICATIONS_EVENT_CONSUMER_QUEUE_CAPACITY
description: Capacity of the blocking Kafka messages queue.
value: "1"
- name: NOTIFICATIONS_KAFKA_OUTGOING_HIGH_VOLUME_TOPIC_ENABLED
description: Specifies whether the high volume topic is enabled in the engine or not.
value: "false"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ public class EngineConfig {
*/
private static final String DEFAULT_TEMPLATE = "notifications.use-default-template";
private static final String EMAILS_ONLY_MODE = "notifications.emails-only-mode.enabled";
private static final String EVENT_CONSUMER_CORE_THREAD_POOL_SIZE = "notifications.event-consumer.core-thread-pool-size";
private static final String EVENT_CONSUMER_MAX_THREAD_POOL_SIZE = "notifications.event-consumer.max-thread-pool-size";
private static final String EVENT_CONSUMER_KEEP_ALIVE_TIME_SECONDS = "notifications.event-consumer.keep-alive-time-seconds";
private static final String EVENT_CONSUMER_QUEUE_CAPACITY = "notifications.event-consumer.queue-capacity";
private static final String SECURED_EMAIL_TEMPLATES = "notifications.use-secured-email-templates.enabled";
private static final String NOTIFICATIONS_KAFKA_OUTGOING_HIGH_VOLUME_TOPIC_ENABLED = "notifications.kafka.outgoing.high-volume.topic.enabled";
private static final String KAFKA_TOCAMEL_MAXIMUM_REQUEST_SIZE = "mp.messaging.outgoing.tocamel.max.request.size";
private static final String UNLEASH = "notifications.unleash.enabled";
public static final String PROCESSOR_CONNECTORS_MAX_SERVER_ERRORS = "processor.connectors.max-server-errors";
public static final String PROCESSOR_CONNECTORS_MIN_DELAY_SINCE_FIRST_SERVER_ERROR = "processor.connectors.min-delay-since-first-server-error";
private static final String PROCESSOR_CONNECTORS_MAX_SERVER_ERRORS = "processor.connectors.max-server-errors";
private static final String PROCESSOR_CONNECTORS_MIN_DELAY_SINCE_FIRST_SERVER_ERROR = "processor.connectors.min-delay-since-first-server-error";

/**
* Standard "Red Hat Hybrid Cloud Console" sender that the vast majority of the
Expand All @@ -44,6 +48,7 @@ public class EngineConfig {
* Unleash configuration
*/
private String asyncAggregationToggle;
private String asyncEventProcessingToggle;
private String drawerToggle;
private String kafkaConsumedTotalCheckerToggle;
private String toggleKafkaOutgoingHighVolumeTopic;
Expand Down Expand Up @@ -88,6 +93,18 @@ public class EngineConfig {
@ConfigProperty(name = EMAILS_ONLY_MODE, defaultValue = "false")
boolean emailsOnlyModeEnabled;

@ConfigProperty(name = EVENT_CONSUMER_CORE_THREAD_POOL_SIZE, defaultValue = "10")
int eventConsumerCoreThreadPoolSize;

@ConfigProperty(name = EVENT_CONSUMER_MAX_THREAD_POOL_SIZE, defaultValue = "10")
int eventConsumerMaxThreadPoolSize;

@ConfigProperty(name = EVENT_CONSUMER_KEEP_ALIVE_TIME_SECONDS, defaultValue = "60")
long eventConsumerKeepAliveTimeSeconds;

@ConfigProperty(name = EVENT_CONSUMER_QUEUE_CAPACITY, defaultValue = "1")
int eventConsumerQueueCapacity;

// Only used in special environments.
@ConfigProperty(name = SECURED_EMAIL_TEMPLATES, defaultValue = "false")
boolean useSecuredEmailTemplates;
Expand Down Expand Up @@ -128,6 +145,7 @@ public class EngineConfig {
@PostConstruct
void postConstruct() {
asyncAggregationToggle = toggleRegistry.register("async-aggregation", true);
asyncEventProcessingToggle = toggleRegistry.register("async-event-processing", true);
drawerToggle = toggleRegistry.register("drawer", true);
kafkaConsumedTotalCheckerToggle = toggleRegistry.register("kafka-consumed-total-checker", true);
toggleKafkaOutgoingHighVolumeTopic = toggleRegistry.register("kafka-outgoing-high-volume-topic", true);
Expand All @@ -140,6 +158,10 @@ void logConfigAtStartup(@Observes Startup event) {
config.put(DEFAULT_TEMPLATE, isDefaultTemplateEnabled());
config.put(drawerToggle, isDrawerEnabled());
config.put(EMAILS_ONLY_MODE, isEmailsOnlyModeEnabled());
config.put(EVENT_CONSUMER_CORE_THREAD_POOL_SIZE, eventConsumerCoreThreadPoolSize);
config.put(EVENT_CONSUMER_MAX_THREAD_POOL_SIZE, eventConsumerMaxThreadPoolSize);
config.put(EVENT_CONSUMER_KEEP_ALIVE_TIME_SECONDS, eventConsumerKeepAliveTimeSeconds);
config.put(EVENT_CONSUMER_QUEUE_CAPACITY, eventConsumerQueueCapacity);
config.put(kafkaConsumedTotalCheckerToggle, isKafkaConsumedTotalCheckerEnabled());
config.put(KAFKA_TOCAMEL_MAXIMUM_REQUEST_SIZE, getKafkaToCamelMaximumRequestSize());
config.put(SECURED_EMAIL_TEMPLATES, isSecuredEmailTemplatesEnabled());
Expand All @@ -150,6 +172,7 @@ void logConfigAtStartup(@Observes Startup event) {
config.put(NOTIFICATIONS_EMAIL_SENDER_OPENSHIFT_STAGE, rhOpenshiftSenderStage);
config.put(NOTIFICATIONS_EMAIL_SENDER_OPENSHIFT_PROD, rhOpenshiftSenderProd);
config.put(toggleKafkaOutgoingHighVolumeTopic, isOutgoingKafkaHighVolumeTopicEnabled());
config.put(asyncEventProcessingToggle, isAsyncEventProcessing());

Log.info("=== Startup configuration ===");
config.forEach((key, value) -> {
Expand All @@ -165,6 +188,14 @@ public boolean isAsyncAggregationEnabled() {
}
}

public boolean isAsyncEventProcessing() {
if (unleashEnabled) {
return unleash.isEnabled(asyncEventProcessingToggle, false);
} else {
return false;
}
}

public boolean isDefaultTemplateEnabled() {
return defaultTemplateEnabled;
}
Expand All @@ -181,6 +212,22 @@ public boolean isEmailsOnlyModeEnabled() {
return emailsOnlyModeEnabled;
}

public int getEventConsumerCoreThreadPoolSize() {
return eventConsumerCoreThreadPoolSize;
}

public int getEventConsumerMaxThreadPoolSize() {
return eventConsumerMaxThreadPoolSize;
}

public long getEventConsumerKeepAliveTimeSeconds() {
return eventConsumerKeepAliveTimeSeconds;
}

public int getEventConsumerQueueCapacity() {
return eventConsumerQueueCapacity;
}

public boolean isKafkaConsumedTotalCheckerEnabled() {
if (unleashEnabled) {
return unleash.isEnabled(kafkaConsumedTotalCheckerToggle, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.redhat.cloud.event.parser.exceptions.ConsoleCloudEventParsingException;
import com.redhat.cloud.notifications.cloudevent.transformers.CloudEventTransformer;
import com.redhat.cloud.notifications.cloudevent.transformers.CloudEventTransformerFactory;
import com.redhat.cloud.notifications.config.EngineConfig;
import com.redhat.cloud.notifications.db.repositories.EventRepository;
import com.redhat.cloud.notifications.db.repositories.EventTypeRepository;
import com.redhat.cloud.notifications.ingress.Action;
Expand All @@ -30,9 +31,14 @@
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

import static com.redhat.cloud.notifications.events.KafkaMessageDeduplicator.MESSAGE_ID_HEADER;
import static java.util.concurrent.TimeUnit.SECONDS;

@ApplicationScoped
public class EventConsumer {
Expand Down Expand Up @@ -75,25 +81,62 @@ public class EventConsumer {
@Inject
KafkaHeadersExtractor kafkaHeadersExtractor;

@Inject
EngineConfig config;

ConsoleCloudEventParser cloudEventParser = new ConsoleCloudEventParser();

private Counter rejectedCounter;
private Counter processingErrorCounter;
private Counter duplicateCounter;
private Counter processingExceptionCounter;
private ExecutorService executor;

@PostConstruct
public void init() {
rejectedCounter = registry.counter(REJECTED_COUNTER_NAME);
processingErrorCounter = registry.counter(PROCESSING_ERROR_COUNTER_NAME);
processingExceptionCounter = registry.counter(PROCESSING_EXCEPTION_COUNTER_NAME);
duplicateCounter = registry.counter(DUPLICATE_COUNTER_NAME);

executor = new ThreadPoolExecutor(
config.getEventConsumerCoreThreadPoolSize(),
config.getEventConsumerMaxThreadPoolSize(),
config.getEventConsumerKeepAliveTimeSeconds(),
SECONDS,
buildBlockingQueue()
);
}

private BlockingQueue<Runnable> buildBlockingQueue() {
return new LinkedBlockingQueue<>(config.getEventConsumerQueueCapacity()) {
@Override
public boolean offer(Runnable runnable) {
try {
// Block until space is available
put(runnable);
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
};
}

@Incoming(INGRESS_CHANNEL)
@Blocking
public CompletionStage<Void> consume(Message<String> message) {
if (config.isAsyncEventProcessing()) {
executor.submit(() -> process(message));
} else {
process(message);
}
return message.ack();
}

@ActivateRequestContext
public CompletionStage<Void> process(Message<String> message) {
public void process(Message<String> message) {
// This timer will have dynamic tag values based on the action parsed from the received message.
Timer.Sample consumedTimer = Timer.start(registry);
String payload = message.getPayload();
Expand Down Expand Up @@ -220,7 +263,6 @@ public CompletionStage<Void> process(Message<String> message) {
TAG_KEY_EVENT_TYPE_FQN, tags.getOrDefault(TAG_KEY_EVENT_TYPE_FQN, "")
));
}
return message.ack();
}

private EventWrapper<?, ?> parsePayload(String payload, Map<String, String> tags) {
Expand Down

0 comments on commit 1b34c02

Please sign in to comment.