From 8999fd8691bdb7488ac4f1c7cbdf239121c53a85 Mon Sep 17 00:00:00 2001 From: Gwenneg Lepage Date: Mon, 16 Oct 2023 09:01:18 +0200 Subject: [PATCH 1/2] Lower the aggregation batch size and make it configurable from env var --- .rhcicd/clowdapp-engine.yaml | 3 +++ .../processors/email/EmailAggregator.java | 19 +++++-------------- .../processors/email/EmailAggregatorTest.java | 8 ++++---- 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/.rhcicd/clowdapp-engine.yaml b/.rhcicd/clowdapp-engine.yaml index 9a0ec6ceb6..c0cf9bad28 100644 --- a/.rhcicd/clowdapp-engine.yaml +++ b/.rhcicd/clowdapp-engine.yaml @@ -420,3 +420,6 @@ parameters: - name: NOTIFICATIONS_DRAWER_CONNECTOR_ENABLED description: Is the drawer connector enabled to process them instead of in the engine? value: "false" +- name: NOTIFICATIONS_AGGREGATION_BATCH_SIZE + description: Number of aggregation records loaded from the DB during each aggregation process iteration + value: "100" diff --git a/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailAggregator.java b/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailAggregator.java index 091c637969..80d53d27ee 100644 --- a/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailAggregator.java +++ b/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailAggregator.java @@ -1,6 +1,5 @@ package com.redhat.cloud.notifications.processors.email; -import com.redhat.cloud.notifications.config.FeatureFlipper; import com.redhat.cloud.notifications.db.repositories.EmailAggregationRepository; import com.redhat.cloud.notifications.db.repositories.EmailSubscriptionRepository; import com.redhat.cloud.notifications.db.repositories.EndpointRepository; @@ -45,20 +44,12 @@ public class EmailAggregator { @Inject EmailSubscriptionRepository emailSubscriptionRepository; - @Inject - FeatureFlipper featureFlipper; - // This is manually used from the JSON payload instead of converting it to an Action and using getEventType() private static final String EVENT_TYPE_KEY = "event_type"; private static final String RECIPIENTS_KEY = "recipients"; - @ConfigProperty(name = "notifications.get.aggregation.max.page.size", defaultValue = "10000") - int aggregationMaxPageSize; - - private Set getEmailSubscribers(EmailAggregationKey aggregationKey, EmailSubscriptionType emailSubscriptionType) { - return Set.copyOf(emailSubscriptionRepository - .getEmailSubscribersUserId(aggregationKey.getOrgId(), aggregationKey.getBundle(), aggregationKey.getApplication(), emailSubscriptionType)); - } + @ConfigProperty(name = "notifications.aggregation.batch-size", defaultValue = "10000") + int batchSize; private Map> getEmailSubscribersGroupedByEventType(EmailAggregationKey aggregationKey, EmailSubscriptionType emailSubscriptionType) { return emailSubscriptionRepository @@ -84,8 +75,8 @@ public Map> getAggregated(EmailAggregationKey aggregat List aggregations; do { // First, we retrieve paginated aggregations that match the given key. - aggregations = emailAggregationRepository.getEmailAggregation(aggregationKey, start, end, offset, aggregationMaxPageSize); - offset += aggregationMaxPageSize; + aggregations = emailAggregationRepository.getEmailAggregation(aggregationKey, start, end, offset, batchSize); + offset += batchSize; // For each aggregation... for (EmailAggregation aggregation : aggregations) { @@ -122,7 +113,7 @@ public Map> getAggregated(EmailAggregationKey aggregat }); } totalAggregatedElements += aggregations.size(); - } while (aggregationMaxPageSize == aggregations.size()); + } while (batchSize == aggregations.size()); Log.infof("%d elements were aggregated for key %s", totalAggregatedElements, aggregationKey); return aggregated diff --git a/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailAggregatorTest.java b/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailAggregatorTest.java index df4f64a696..8e9573d952 100644 --- a/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailAggregatorTest.java +++ b/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailAggregatorTest.java @@ -67,7 +67,7 @@ class EmailAggregatorTest { @BeforeEach void beforeEach() { - emailAggregator.aggregationMaxPageSize = 5; + emailAggregator.batchSize = 5; } @Test @@ -98,7 +98,7 @@ void shouldTestRecipientsFromSubscription() { // Test user subscription based on event type Map> result = aggregate(); verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), anyInt(), anyInt()); - verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(0), eq(emailAggregator.aggregationMaxPageSize)); + verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(0), eq(emailAggregator.batchSize)); reset(emailAggregationRepository); // just reset mockito counter // nobody subscribed to the right event type yet @@ -108,8 +108,8 @@ void shouldTestRecipientsFromSubscription() { // because after the previous aggregate() call the email_aggregation DB table was not purged, we already have 4 records on database result = aggregate(); verify(emailAggregationRepository, times(2)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), anyInt(), anyInt()); - verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(0), eq(emailAggregator.aggregationMaxPageSize)); - verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(5), eq(emailAggregator.aggregationMaxPageSize)); + verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(0), eq(emailAggregator.batchSize)); + verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(5), eq(emailAggregator.batchSize)); assertEquals(1, result.size()); User user = result.keySet().stream().findFirst().get(); assertTrue(user.getEmail().equals("user-2")); From ae956d2c3df3a4599678b6f77a0610766fa71210 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 17 Oct 2023 00:53:57 +0000 Subject: [PATCH 2/2] Bump org.jacoco:jacoco-maven-plugin from 0.8.10 to 0.8.11 Bumps [org.jacoco:jacoco-maven-plugin](https://github.com/jacoco/jacoco) from 0.8.10 to 0.8.11. - [Release notes](https://github.com/jacoco/jacoco/releases) - [Commits](https://github.com/jacoco/jacoco/compare/v0.8.10...v0.8.11) --- updated-dependencies: - dependency-name: org.jacoco:jacoco-maven-plugin dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 19b9965994..d2a6c56268 100644 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,7 @@ 3.1.2 3.3.0 3.3.0 - 0.8.10 + 0.8.11 10.12.4 1.19.1