Skip to content

Commit

Permalink
[RHCLOUD-28606] Remove deprecated aggregation command (RedHatInsights…
Browse files Browse the repository at this point in the history
  • Loading branch information
g-duval authored Oct 19, 2023
1 parent 038b76f commit 1d320d3
Show file tree
Hide file tree
Showing 14 changed files with 69 additions and 209 deletions.
9 changes: 0 additions & 9 deletions .rhcicd/clowdapp-aggregator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ objects:
- notifications-backend
database:
sharedDbAppName: notifications-backend
kafkaTopics:
- topicName: platform.notifications.aggregation
partitions: 2
replicas: 3
jobs:
- name: cronjob
schedule: ${CRONTAB_AGGREGATION_JOB}
Expand Down Expand Up @@ -61,8 +57,6 @@ objects:
value: ${ENV_NAME}
- name: PROMETHEUS_PUSHGATEWAY_URL
value: ${PROMETHEUS_PUSHGATEWAY}
- name: NOTIFICATIONS_AGGREGATOR_SEND_ON_INGRESS
value: ${NOTIFICATIONS_AGGREGATOR_SEND_ON_INGRESS}
parameters:
- name: CLOUDWATCH_ENABLED
description: Enable Cloudwatch (or not)
Expand Down Expand Up @@ -104,6 +98,3 @@ parameters:
- name: SUSPEND_AGGREGATION_JOB
description: Should the aggregation cron job be disabled?
value: "false"
- name: NOTIFICATIONS_AGGREGATOR_SEND_ON_INGRESS
description: Should send aggregation command on Ingress topic
value: "false"
8 changes: 0 additions & 8 deletions .rhcicd/clowdapp-engine.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ objects:
- topicName: platform.notifications.fromcamel
partitions: 3
replicas: 3
- topicName: platform.notifications.aggregation
partitions: 2
replicas: 3
- topicName: platform.export.requests
partitions: 3
replicas: 3
Expand Down Expand Up @@ -97,8 +94,6 @@ objects:
value: ${ENV_BASE_URL}
- name: ENV_NAME
value: ${ENV_NAME}
- name: MP_MESSAGING_INCOMING_AGGREGATION_THROTTLED_UNPROCESSED_RECORD_MAX_AGE_MS
value: ${MP_MESSAGING_INCOMING_AGGREGATION_THROTTLED_UNPROCESSED_RECORD_MAX_AGE_MS}
- name: MP_MESSAGING_INCOMING_FROMCAMEL_ENABLED
value: ${MP_MESSAGING_INCOMING_FROMCAMEL_ENABLED}
- name: MP_MESSAGING_INCOMING_INGRESS_ENABLED
Expand Down Expand Up @@ -300,9 +295,6 @@ parameters:
value: 250Mi
- name: MIN_REPLICAS
value: "1"
- name: MP_MESSAGING_INCOMING_AGGREGATION_THROTTLED_UNPROCESSED_RECORD_MAX_AGE_MS
description: Max age in milliseconds that an unprocessed message can be before the connector is marked as unhealthy
value: "60000"
- name: MP_MESSAGING_INCOMING_FROMCAMEL_ENABLED
value: "true"
- name: MP_MESSAGING_INCOMING_INGRESS_ENABLED
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.redhat.cloud.notifications;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.redhat.cloud.notifications.config.FeatureFlipper;
import com.redhat.cloud.notifications.db.AggregationOrgConfigRepository;
import com.redhat.cloud.notifications.db.EmailAggregationRepository;
import com.redhat.cloud.notifications.ingress.Action;
Expand All @@ -28,11 +25,8 @@
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static io.quarkus.runtime.LaunchMode.NORMAL;
Expand All @@ -41,7 +35,6 @@
@ApplicationScoped
public class DailyEmailAggregationJob {

public static final String AGGREGATION_CHANNEL = "aggregation";
public static final String EGRESS_CHANNEL = "egress";
public static final String BUNDLE_NAME = "console";
public static final String APP_NAME = "notifications";
Expand All @@ -53,26 +46,16 @@ public class DailyEmailAggregationJob {
@Inject
AggregationOrgConfigRepository aggregationOrgConfigRepository;

@Inject
ObjectMapper objectMapper;

@ConfigProperty(name = "prometheus.pushgateway.url")
String prometheusPushGatewayUrl;

@ConfigProperty(name = "notifications.default.daily.digest.time", defaultValue = "00:00")
LocalTime defaultDailyDigestTime;

@Inject
@Channel(AGGREGATION_CHANNEL)
Emitter<String> emitter;

@Inject
@Channel(EGRESS_CHANNEL)
Emitter<String> emitterIngress;

@Inject
FeatureFlipper featureFlipper;

private Gauge pairsProcessed;

@ActivateRequestContext
Expand All @@ -91,27 +74,7 @@ public void processDailyEmail() {
aggregationOrgConfigRepository.createMissingDefaultConfiguration(defaultDailyDigestTime);
List<AggregationCommand> aggregationCommands = processAggregateEmailsWithOrgPref(now, registry);

List<CompletableFuture<Void>> futures = new ArrayList<>();
for (AggregationCommand aggregationCommand : aggregationCommands) {
try {
if (featureFlipper.isAggregatorSendOnIngress()) {
sendIt(aggregationCommand);
} else {
final String payload = objectMapper.writeValueAsString(aggregationCommand);
futures.add(emitter.send(payload).toCompletableFuture());
}
} catch (JsonProcessingException e) {
Log.warn("Could not transform AggregationCommand to JSON object.", e);
}
}
if (!futures.isEmpty()) {
// resolve completable futures so the Quarkus main thread doesn't stop before everything has been sent
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
} catch (InterruptedException | ExecutionException ie) {
Log.error("Writing AggregationCommands failed", ie);
}
}
aggregationCommands.stream().forEach(aggregationCommand -> sendIt(aggregationCommand));

List<String> orgIdsToUpdate = aggregationCommands.stream().map(agc -> agc.getAggregationKey().getOrgId()).collect(Collectors.toList());
emailAggregationResources.updateLastCronJobRunAccordingOrgPref(orgIdsToUpdate, now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io.quarkus.runtime.configuration.ProfileManager;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import static io.quarkus.runtime.LaunchMode.TEST;

Expand Down Expand Up @@ -38,21 +37,8 @@
@ApplicationScoped
public class FeatureFlipper {

@ConfigProperty(name = "notifications.aggregator-send-on-ingress", defaultValue = "false")
boolean aggregatorSendOnIngress;

void logFeaturesStatusAtStartup(@Observes StartupEvent event) {
Log.infof("=== %s startup status ===", FeatureFlipper.class.getSimpleName());
Log.infof("The aggregator send command on ingress is %s", aggregatorSendOnIngress ? "enabled" : "disabled");
}

public boolean isAggregatorSendOnIngress() {
return aggregatorSendOnIngress;
}

public void setAggregatorSendOnIngress(boolean aggregatorSendOnIngress) {
checkTestLaunchMode();
this.aggregatorSendOnIngress = aggregatorSendOnIngress;
}

/**
Expand Down
7 changes: 0 additions & 7 deletions aggregator/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,6 @@ quarkus.datasource.jdbc.url=jdbc:postgresql://127.0.0.1:5432/notifications

quarkus.hibernate-orm.physical-naming-strategy=com.redhat.cloud.notifications.db.naming.SnakeCasePhysicalNamingStrategy

# Output queue
mp.messaging.outgoing.aggregation.connector=smallrye-kafka
mp.messaging.outgoing.aggregation.topic=platform.notifications.aggregation
mp.messaging.outgoing.aggregation.group.id=integrations
mp.messaging.outgoing.aggregation.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.aggregation.value.serializer=org.apache.kafka.common.serialization.StringSerializer

# Output queue
mp.messaging.outgoing.egress.connector=smallrye-kafka
mp.messaging.outgoing.egress.topic=platform.notifications.ingress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.redhat.cloud.notifications.config.FeatureFlipper;
import com.redhat.cloud.notifications.helpers.ResourceHelpers;
import com.redhat.cloud.notifications.ingress.Action;
import com.redhat.cloud.notifications.ingress.Parser;
Expand All @@ -20,8 +19,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.time.LocalDateTime;
import java.time.LocalTime;
Expand Down Expand Up @@ -50,9 +47,6 @@ class DailyEmailAggregationJobTest {
@Any
InMemoryConnector connector;

@Inject
FeatureFlipper featureFlipper;

@Inject
ObjectMapper objectMapper;

Expand All @@ -73,9 +67,7 @@ void setUp() {
@AfterEach
void tearDown() {
helpers.purgeEmailAggregations();
connector.sink(DailyEmailAggregationJob.AGGREGATION_CHANNEL).clear();
connector.sink(DailyEmailAggregationJob.EGRESS_CHANNEL).clear();
featureFlipper.setAggregatorSendOnIngress(false);
}

void initAggregationParameters() {
Expand All @@ -85,25 +77,18 @@ void initAggregationParameters() {

List<AggregationCommand> getRecordsFromKafka() throws JsonProcessingException {
List<AggregationCommand> aggregationCommands = new ArrayList<>();
if (featureFlipper.isAggregatorSendOnIngress()) {
InMemorySink<String> results = connector.sink(DailyEmailAggregationJob.EGRESS_CHANNEL);
for (Message message : results.received()) {
Action action = Parser.decode(String.valueOf(message.getPayload()));
aggregationCommands.add(objectMapper.convertValue(action.getEvents().get(0).getPayload().getAdditionalProperties(), AggregationCommand.class));
}
} else {
InMemorySink<String> results = connector.sink(DailyEmailAggregationJob.AGGREGATION_CHANNEL);
for (Message message : results.received()) {
aggregationCommands.add(objectMapper.readValue(message.getPayload().toString(), AggregationCommand.class));
}

InMemorySink<String> results = connector.sink(DailyEmailAggregationJob.EGRESS_CHANNEL);
for (Message message : results.received()) {
Action action = Parser.decode(String.valueOf(message.getPayload()));
aggregationCommands.add(objectMapper.convertValue(action.getEvents().get(0).getPayload().getAdditionalProperties(), AggregationCommand.class));
}

return aggregationCommands;
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void shouldSentFourAggregationsToKafkaTopic(boolean isSendOnIngressFeatureEnabled) throws JsonProcessingException {
featureFlipper.setAggregatorSendOnIngress(isSendOnIngressFeatureEnabled);
@Test
void shouldSentFourAggregationsToKafkaTopic() throws JsonProcessingException {

helpers.addEmailAggregation("someOrgId", "rhel", "policies", "somePolicyId", "someHostId");
helpers.addEmailAggregation("anotherOrgId", "rhel", "policies", "somePolicyId", "someHostId");
Expand All @@ -121,10 +106,8 @@ void shouldSentFourAggregationsToKafkaTopic(boolean isSendOnIngressFeatureEnable
checkAggCommand(listCommand.get(3), "someOrgId", "rhel", "unknown-application");
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void shouldSentTwoAggregationsToKafkaTopic(boolean isSendOnIngressFeatureEnabled) throws JsonProcessingException {
featureFlipper.setAggregatorSendOnIngress(isSendOnIngressFeatureEnabled);
@Test
void shouldSentTwoAggregationsToKafkaTopic() throws JsonProcessingException {
LocalTime now = LocalTime.now(ZoneOffset.UTC);
helpers.addEmailAggregation("someOrgId", "rhel", "policies", "somePolicyId", "someHostId");
helpers.addEmailAggregation("anotherOrgId", "rhel", "policies", "somePolicyId", "someHostId");
Expand All @@ -146,7 +129,6 @@ void shouldSentTwoAggregationsToKafkaTopic(boolean isSendOnIngressFeatureEnabled
// remove all preferences, and set default hour in the past, nothing should be processed
helpers.purgeAggregationOrgConfig();
testee.setDefaultDailyDigestTime(now.minusHours(2));
connector.sink(DailyEmailAggregationJob.AGGREGATION_CHANNEL).clear();
connector.sink(DailyEmailAggregationJob.EGRESS_CHANNEL).clear();

testee.processDailyEmail();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public Map<String, String> start() {
}

public void setupInMemoryConnector(Map<String, String> props) {
props.putAll(InMemoryConnector.switchOutgoingChannelsToInMemory(DailyEmailAggregationJob.AGGREGATION_CHANNEL));
props.putAll(InMemoryConnector.switchOutgoingChannelsToInMemory(DailyEmailAggregationJob.EGRESS_CHANNEL));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.redhat.cloud.notifications.processors.email;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.redhat.cloud.notifications.config.FeatureFlipper;
import com.redhat.cloud.notifications.db.repositories.ApplicationRepository;
Expand Down Expand Up @@ -38,15 +37,11 @@
import io.micrometer.core.instrument.Timer;
import io.quarkus.logging.Log;
import io.quarkus.qute.TemplateInstance;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.vertx.core.json.JsonObject;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.control.ActivateRequestContext;
import jakarta.inject.Inject;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
Expand All @@ -65,7 +60,6 @@ public class EmailSubscriptionTypeProcessor extends SystemEndpointTypeProcessor

public static final String TOTAL_RECIPIENTS_KEY = "total_recipients";
public static final String TOTAL_FAILURE_RECIPIENTS_KEY = "total_failure_recipients";
public static final String AGGREGATION_CHANNEL = "aggregation";
public static final String AGGREGATION_COMMAND_REJECTED_COUNTER_NAME = "aggregation.command.rejected";
public static final String AGGREGATION_COMMAND_PROCESSED_COUNTER_NAME = "aggregation.command.processed";
public static final String AGGREGATION_COMMAND_ERROR_COUNTER_NAME = "aggregation.command.error";
Expand Down Expand Up @@ -246,38 +240,6 @@ public void processAggregation(Event event) {
});
}

@Incoming(AGGREGATION_CHANNEL)
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
@Blocking
@ActivateRequestContext
public void consumeEmailAggregations(String aggregationCommandJson) {
Timer.Sample consumedTimer = Timer.start(registry);
AggregationCommand aggregationCommand;
try {
aggregationCommand = objectMapper.readValue(aggregationCommandJson, AggregationCommand.class);
} catch (JsonProcessingException e) {
Log.error("Kafka aggregation payload parsing failed", e);
rejectedAggregationCommandCount.increment();
return;
}

Log.infof("Processing received aggregation command: %s", aggregationCommand);
processedAggregationCommandCount.increment();

try {
processAggregateEmailsByAggregationKey(aggregationCommand, Optional.empty());
} catch (Exception e) {
Log.warn("Error while processing aggregation", e);
failedAggregationCommandCount.increment();
} finally {
consumedTimer.stop(registry.timer(
AGGREGATION_CONSUMED_TIMER_NAME,
TAG_KEY_BUNDLE, aggregationCommand.getAggregationKey().getBundle(),
TAG_KEY_APPLICATION, aggregationCommand.getAggregationKey().getApplication()
));
}
}

private void processAggregateEmailsByAggregationKey(AggregationCommand aggregationCommand, Optional<Event> aggregatorEvent) {
TemplateInstance subject = null;
TemplateInstance body = null;
Expand Down
Loading

0 comments on commit 1d320d3

Please sign in to comment.