Skip to content

Commit

Permalink
Introduce SEDA in the email connector (RedHatInsights#2286)
Browse files Browse the repository at this point in the history
  • Loading branch information
gwenneg authored Oct 27, 2023
1 parent e9cb69a commit 7d73560
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 5 deletions.
10 changes: 10 additions & 0 deletions .rhcicd/clowdapp-connector-email.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ objects:
value: ${NOTIFICATIONS_CONNECTOR_REDELIVERY_DELAY}
- name: NOTIFICATIONS_CONNECTOR_REDELIVERY_MAX_ATTEMPTS
value: ${NOTIFICATIONS_CONNECTOR_REDELIVERY_MAX_ATTEMPTS}
- name: NOTIFICATIONS_CONNECTOR_SEDA_CONCURRENT_CONSUMERS
value: ${NOTIFICATIONS_CONNECTOR_SEDA_CONCURRENT_CONSUMERS}
- name: NOTIFICATIONS_CONNECTOR_SEDA_QUEUE_SIZE
value: ${NOTIFICATIONS_CONNECTOR_SEDA_QUEUE_SIZE}
- name: NOTIFICATIONS_CONNECTOR_SINGLE_EMAIL_PER_USER_ENABLED
value: ${NOTIFICATIONS_CONNECTOR_SINGLE_EMAIL_PER_USER_ENABLED}
- name: NOTIFICATIONS_CONNECTOR_USER_PROVIDER_BOP_API_TOKEN
Expand Down Expand Up @@ -219,6 +223,12 @@ parameters:
- name: NOTIFICATIONS_CONNECTOR_REDELIVERY_MAX_ATTEMPTS
description: Maximum number of redelivery attempts (initial call not included)
value: "2"
- name: NOTIFICATIONS_CONNECTOR_SEDA_CONCURRENT_CONSUMERS
description: Number of concurrent threads processing exchanges with SEDA
value: "1"
- name: NOTIFICATIONS_CONNECTOR_SEDA_QUEUE_SIZE
description: Maximum capacity of the SEDA queue
value: "1"
- name: NOTIFICATIONS_CONNECTOR_SINGLE_EMAIL_PER_USER_ENABLED
description: Is sending a single email per user enabled?
value: "false"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void configureRoute() throws Exception {
*/
this.configureCaffeineCache();

from(direct(ENGINE_TO_CONNECTOR))
from(seda(ENGINE_TO_CONNECTOR))
.routeId(this.connectorConfig.getConnectorName())
// Initialize the usernames hash set, where we will gather the
// fetched users from the user providers.
Expand Down
5 changes: 5 additions & 0 deletions connector-email/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ notifications.connector.kafka.incoming.topic=${mp.messaging.tocamel.topic}
notifications.connector.kafka.outgoing.topic=${mp.messaging.fromcamel.topic}
notifications.connector.name=email_subscription
notifications.connector.redelivery.counter-name=camel.email.retry.counter
# The following value matches the default value of the `connectionsPerRoute` option from the Camel `http` component.
notifications.connector.seda.concurrent-consumers=20
notifications.connector.seda.enabled=true
# The following value matches the default value of the `connectionsPerRoute` option from the Camel `http` component.
notifications.connector.seda.queue-size=20

quarkus.http.port=9003

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ void testMultipleRecipientSettingsSameUserSendSingleEmail() throws Exception {
successEndpoint.expectedMessageCount(1);

// Send the exchange to the entry point of the email connector.
this.producerTemplate.send(String.format("direct:%s", EngineToConnectorRouteBuilder.ENGINE_TO_CONNECTOR), exchange);
this.producerTemplate.send(String.format("seda:%s", EngineToConnectorRouteBuilder.ENGINE_TO_CONNECTOR), exchange);

successEndpoint.assertIsSatisfied();
// We need a timeout here because SEDA processes the exchange from a different thread and a race condition may happen.
successEndpoint.assertIsSatisfied(2000L);

// Get the exchanges that we sent to BOP. In theory, since we are
// sending an email per user, we should receive many of them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,10 @@ void testMultipleRecipientSettingsSameUserSendSingleEmail() throws Exception {
successEndpoint.expectedMessageCount(1);

// Send the exchange to the entry point of the email connector.
this.producerTemplate.send(String.format("direct:%s", EngineToConnectorRouteBuilder.ENGINE_TO_CONNECTOR), exchange);
this.producerTemplate.send(String.format("seda:%s", EngineToConnectorRouteBuilder.ENGINE_TO_CONNECTOR), exchange);

successEndpoint.assertIsSatisfied();
// We need a timeout here because SEDA processes the exchange from a different thread and a race condition may happen.
successEndpoint.assertIsSatisfied(2000L);

// Get the exchanges that we sent to BOP. In theory, since we are
// sending a single email for multiple users, we should only receive
Expand Down

0 comments on commit 7d73560

Please sign in to comment.