From e6185eee4c6772cade48859ebc778ca6ca014240 Mon Sep 17 00:00:00 2001 From: wagnerdk <18245675+wagnerdk@users.noreply.github.com> Date: Thu, 25 Jul 2019 14:19:43 +0200 Subject: [PATCH 01/13] Move sending related methods to KafkaMessageSender As describted by https://github.com/UST-MICO/mico/issues/762 we want to split the receiving and the sending logic. --- .../KafkaMessageSender.java | 209 ++++++++++++++++++ .../kafkafaasconnector/MessageListener.java | 184 +-------------- .../MessageListenerTests.java | 13 +- 3 files changed, 224 insertions(+), 182 deletions(-) create mode 100644 src/main/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSender.java diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSender.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSender.java new file mode 100644 index 0000000..c729f4c --- /dev/null +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSender.java @@ -0,0 +1,209 @@ +package io.github.ust.mico.kafkafaasconnector; + +import com.fasterxml.jackson.databind.JsonNode; +import io.github.ust.mico.kafkafaasconnector.configuration.KafkaConfig; +import io.github.ust.mico.kafkafaasconnector.exception.BatchMicoCloudEventException; +import io.github.ust.mico.kafkafaasconnector.exception.MicoCloudEventException; +import io.github.ust.mico.kafkafaasconnector.kafka.MicoCloudEventImpl; +import io.github.ust.mico.kafkafaasconnector.kafka.RouteHistory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +@Slf4j +@Service +public class KafkaMessageSender { + + @Autowired + private KafkaTemplate> kafkaTemplate; + + @Autowired + private KafkaConfig kafkaConfig; + + + /** + * Send a cloud event using the sendCloudEvent method. + *

+ * This method is safe in the sense that it does not throw exceptions and catches all exceptions during sending. + * + * @param cloudEvent the cloud event to send + * @param originalMessageId the id of the original message + */ + public void safeSendCloudEvent(MicoCloudEventImpl cloudEvent, String originalMessageId) { + try { + this.sendCloudEvent(cloudEvent, originalMessageId); + } catch (MicoCloudEventException e) { + this.safeSendErrorMessage(e.getErrorEvent(), this.kafkaConfig.getInvalidMessageTopic(), originalMessageId); + } catch (BatchMicoCloudEventException e) { + for (MicoCloudEventException error : e.exceptions) { + this.safeSendErrorMessage(error.getErrorEvent(), this.kafkaConfig.getInvalidMessageTopic(), originalMessageId); + } + } catch (Exception e) { + MicoCloudEventException error = new MicoCloudEventException("An error occurred while sending the cloud event.", cloudEvent); + this.safeSendErrorMessage(error.getErrorEvent(), this.kafkaConfig.getInvalidMessageTopic(), originalMessageId); + } + } + + + /** + * Send a cloud event error message using the sendCloudEvent method. + *

+ * This method is safe in the sense that it does not throw exceptions and catches all exceptions during sending. + * + * @param cloudEvent the cloud event to send + * @param topic the kafka topic to send the cloud event to + * @param originalMessageId the id of the original message + */ + public void safeSendErrorMessage(MicoCloudEventImpl cloudEvent, String topic, String originalMessageId) { + try { + this.sendCloudEvent(cloudEvent, topic, originalMessageId); + } catch (Exception e) { + log.error("Failed to process error message. Caused by: {}", e.getMessage()); + } + } + + /** + * Send cloud event to the specified topic. + *

+ * This method also updates the route history of the cloud event before sending. + * + * @param cloudEvent the cloud event to send + * @param topic the kafka topic to send the cloud event to + * @param originalMessageId the id of the original message + */ + private void sendCloudEvent(MicoCloudEventImpl cloudEvent, String topic, String originalMessageId) throws MicoCloudEventException { + try { + cloudEvent = this.updateRouteHistoryWithTopic(cloudEvent, topic); + // TODO commit logic/transactions + setMissingHeaderFields(cloudEvent, originalMessageId); + if (!isTestMessageCompleted(cloudEvent, topic)) { + log.debug("Is not necessary to filter the message. Is test message '{}', filterOutBeforeTopic: '{}', targetTopic: '{}'", cloudEvent.isTestMessage(), cloudEvent.getFilterOutBeforeTopic(), topic); + kafkaTemplate.send(topic, cloudEvent); + } else { + log.info("Filter out test message: '{}' to topic: '{}'", cloudEvent, kafkaConfig.getTestMessageOutputTopic()); + kafkaTemplate.send(kafkaConfig.getTestMessageOutputTopic(), cloudEvent); + } + } catch (Exception e) { + throw new MicoCloudEventException("An error occurred while sending the cloud event.", e, cloudEvent); + } + } + + /** + * This method checks if it is necessary to filter it out. This only works + * for testMessages. It returns {@code true} if {@code isTestMessage} is + * true and the param {@code topic} equals {@code filterOutBeforeTopic}. + * + * @param topic The target topic or destination of this message. + * @return isTestMessage && topic.equals(filterOutBeforeTopic) + */ + public boolean isTestMessageCompleted(MicoCloudEventImpl cloudEvent, String topic) { + return cloudEvent.isTestMessage().orElse(false) && topic.equals(cloudEvent.getFilterOutBeforeTopic().orElse(null)); + } + + /** + * Add a topic routing step to the routing history of the cloud event. + * + * @param cloudEvent the cloud event to update + * @param topic the next topic the event will be sent to + * @return the updated cloud event + */ + private MicoCloudEventImpl updateRouteHistoryWithTopic(MicoCloudEventImpl cloudEvent, String topic) { + return this.updateRouteHistory(cloudEvent, topic, "topic"); + } + + /** + * Update the routing history in the `route` header field of the cloud event. + * + * @param cloudEvent the cloud event to update + * @param id the string id of the next routing step the message will take + * @param type the type of the routing step ("topic" or "faas-function") + * @return the updated cloud event + */ + public MicoCloudEventImpl updateRouteHistory(MicoCloudEventImpl cloudEvent, String id, String type) { + RouteHistory routingStep = new RouteHistory(type, id, ZonedDateTime.now()); + List history = cloudEvent.getRoute().map(route -> new ArrayList<>(route)).orElse(new ArrayList<>()); + history.add(routingStep); + return new MicoCloudEventImpl(cloudEvent).setRoute(history); + } + + /** + * Sets the time, the correlationId and the Id field of a CloudEvent message if missing + * + * @param cloudEvent the cloud event to send + * @param originalMessageId the id of the original message + */ + public void setMissingHeaderFields(MicoCloudEventImpl cloudEvent, String originalMessageId) { + // Add random id if missing + if (StringUtils.isEmpty(cloudEvent.getId())) { + cloudEvent.setRandomId(); + log.debug("Added missing id '{}' to cloud event", cloudEvent.getId()); + } + // Add time if missing + if (!cloudEvent.getTime().isPresent()) { + cloudEvent.setTime(ZonedDateTime.now()); + log.debug("Added missing time '{}' to cloud event", cloudEvent.getTime().orElse(null)); + } + if (!StringUtils.isEmpty(originalMessageId)) { + // Add correlation id if missing + if (!cloudEvent.getCorrelationId().isPresent()) { + cloudEvent.setCorrelationId(originalMessageId); + } + // Set 'created from' to the original message id only if necessary + if (!cloudEvent.getId().equals(originalMessageId)) { + if (!cloudEvent.isErrorMessage().orElse(false) || + (cloudEvent.isErrorMessage().orElse(false) && StringUtils.isEmpty(cloudEvent.getCreatedFrom().orElse("")))) { + cloudEvent.setCreatedFrom(originalMessageId); + } + } + } + // Add source if it is an error message, e.g.: kafka://mico/transform-request + if (cloudEvent.isErrorMessage().orElse(false)) { + try { + URI source = new URI("kafka://" + this.kafkaConfig.getGroupId() + "/" + this.kafkaConfig.getInputTopic()); + cloudEvent.setSource(source); + } catch (URISyntaxException e) { + log.error("Could not construct a valid source attribute for the error message. " + + "Caused by: {}", e.getMessage()); + } + } + } + + + /** + * Send cloud event to default topic or topic(s) next in the routingSlip. + * + * @param cloudEvent the cloud event to send + * @param originalMessageId the id of the original message + */ + private void sendCloudEvent(MicoCloudEventImpl cloudEvent, String originalMessageId) throws MicoCloudEventException, BatchMicoCloudEventException { + LinkedList> routingSlip = cloudEvent.getRoutingSlip().orElse(new LinkedList<>()); + if (!routingSlip.isEmpty()) { + List destinations = routingSlip.removeLast(); + ArrayList exceptions = new ArrayList<>(destinations.size()); + for (String topic : destinations) { + try { + this.sendCloudEvent(cloudEvent, topic, originalMessageId); + } catch (MicoCloudEventException e) { + // defer exception handling + exceptions.add(e); + } + } + if (exceptions.size() > 0) { + throw new BatchMicoCloudEventException((MicoCloudEventException[]) exceptions.toArray()); + } + } else { + // default case: + this.sendCloudEvent(cloudEvent, this.kafkaConfig.getOutputTopic(), originalMessageId); + } + } + + +} diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java index 324d93d..a1c0061 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java @@ -55,15 +55,15 @@ public class MessageListener { @Autowired private RestTemplate restTemplate; - @Autowired - private KafkaTemplate> kafkaTemplate; - @Autowired private KafkaConfig kafkaConfig; @Autowired private OpenFaaSConfig openFaaSConfig; + @Autowired + private KafkaMessageSender kafkaMessageSender; + /** * Entry point for incoming messages from kafka. * @@ -86,59 +86,21 @@ public void receive(MicoCloudEventImpl cloudEvent) { throw new MicoCloudEventException("CloudEvent has already expired!", cloudEvent); } else if (this.openFaaSConfig.isSkipFunctionCall()) { // when skipping the openFaaS function just pass on the original cloudEvent - this.safeSendCloudEvent(cloudEvent, originalMessageId); + kafkaMessageSender.safeSendCloudEvent(cloudEvent, originalMessageId); } else { String functionResult = callFaasFunction(cloudEvent); ArrayList> events = parseFunctionResult(functionResult, cloudEvent); for (MicoCloudEventImpl event : events) { // individually wrap each cloud event in the results for sending - this.safeSendCloudEvent(event, originalMessageId); + kafkaMessageSender.safeSendCloudEvent(event, originalMessageId); } } } catch (MicoCloudEventException e) { - this.safeSendErrorMessage(e.getErrorEvent(), this.kafkaConfig.getInvalidMessageTopic(), originalMessageId); + kafkaMessageSender.safeSendErrorMessage(e.getErrorEvent(), this.kafkaConfig.getInvalidMessageTopic(), originalMessageId); } } - /** - * Send a cloud event using the sendCloudEvent method. - *

- * This method is safe in the sense that it does not throw exceptions and catches all exceptions during sending. - * - * @param cloudEvent the cloud event to send - * @param originalMessageId the id of the original message - */ - private void safeSendCloudEvent(MicoCloudEventImpl cloudEvent, String originalMessageId) { - try { - this.sendCloudEvent(cloudEvent, originalMessageId); - } catch (MicoCloudEventException e) { - this.safeSendErrorMessage(e.getErrorEvent(), this.kafkaConfig.getInvalidMessageTopic(), originalMessageId); - } catch (BatchMicoCloudEventException e) { - for (MicoCloudEventException error : e.exceptions) { - this.safeSendErrorMessage(error.getErrorEvent(), this.kafkaConfig.getInvalidMessageTopic(), originalMessageId); - } - } catch (Exception e) { - MicoCloudEventException error = new MicoCloudEventException("An error occurred while sending the cloud event.", cloudEvent); - this.safeSendErrorMessage(error.getErrorEvent(), this.kafkaConfig.getInvalidMessageTopic(), originalMessageId); - } - } - /** - * Send a cloud event error message using the sendCloudEvent method. - *

- * This method is safe in the sense that it does not throw exceptions and catches all exceptions during sending. - * - * @param cloudEvent the cloud event to send - * @param topic the kafka topic to send the cloud event to - * @param originalMessageId the id of the original message - */ - private void safeSendErrorMessage(MicoCloudEventImpl cloudEvent, String topic, String originalMessageId) { - try { - this.sendCloudEvent(cloudEvent, topic, originalMessageId); - } catch (Exception e) { - log.error("Failed to process error message. Caused by: {}", e.getMessage()); - } - } /** * Add a function call routing step to the routing history of the cloud event. @@ -148,34 +110,9 @@ private void safeSendErrorMessage(MicoCloudEventImpl cloudEvent, Strin * @return the updated cloud event */ public MicoCloudEventImpl updateRouteHistoryWithFunctionCall(MicoCloudEventImpl cloudEvent, String functionId) { - return this.updateRouteHistory(cloudEvent, functionId, "faas-function"); - } - - /** - * Add a topic routing step to the routing history of the cloud event. - * - * @param cloudEvent the cloud event to update - * @param topic the next topic the event will be sent to - * @return the updated cloud event - */ - public MicoCloudEventImpl updateRouteHistoryWithTopic(MicoCloudEventImpl cloudEvent, String topic) { - return this.updateRouteHistory(cloudEvent, topic, "topic"); + return kafkaMessageSender.updateRouteHistory(cloudEvent, functionId, "faas-function"); } - /** - * Update the routing history in the `route` header field of the cloud event. - * - * @param cloudEvent the cloud event to update - * @param id the string id of the next routing step the message will take - * @param type the type of the routing step ("topic" or "faas-function") - * @return the updated cloud event - */ - public MicoCloudEventImpl updateRouteHistory(MicoCloudEventImpl cloudEvent, String id, String type) { - RouteHistory routingStep = new RouteHistory(type, id, ZonedDateTime.now()); - List history = cloudEvent.getRoute().map(route -> new ArrayList<>(route)).orElse(new ArrayList<>()); - history.add(routingStep); - return new MicoCloudEventImpl(cloudEvent).setRoute(history); - } /** * Synchronously call the configured openFaaS function. @@ -219,111 +156,4 @@ public ArrayList> parseFunctionResult(String functi } } - /** - * Send cloud event to default topic or topic(s) next in the routingSlip. - * - * @param cloudEvent the cloud event to send - * @param originalMessageId the id of the original message - */ - private void sendCloudEvent(MicoCloudEventImpl cloudEvent, String originalMessageId) throws MicoCloudEventException, BatchMicoCloudEventException { - LinkedList> routingSlip = cloudEvent.getRoutingSlip().orElse(new LinkedList<>()); - if (!routingSlip.isEmpty()) { - List destinations = routingSlip.removeLast(); - ArrayList exceptions = new ArrayList<>(destinations.size()); - for (String topic : destinations) { - try { - this.sendCloudEvent(cloudEvent, topic, originalMessageId); - } catch (MicoCloudEventException e) { - // defer exception handling - exceptions.add(e); - } - } - if (exceptions.size() > 0) { - throw new BatchMicoCloudEventException((MicoCloudEventException[]) exceptions.toArray()); - } - } else { - // default case: - this.sendCloudEvent(cloudEvent, this.kafkaConfig.getOutputTopic(), originalMessageId); - } - } - - /** - * Send cloud event to the specified topic. - *

- * This method also updates the route history of the cloud event before sending. - * - * @param cloudEvent the cloud event to send - * @param topic the kafka topic to send the cloud event to - * @param originalMessageId the id of the original message - */ - private void sendCloudEvent(MicoCloudEventImpl cloudEvent, String topic, String originalMessageId) throws MicoCloudEventException { - try { - cloudEvent = this.updateRouteHistoryWithTopic(cloudEvent, topic); - // TODO commit logic/transactions - setMissingHeaderFields(cloudEvent, originalMessageId); - if (!isTestMessageCompleted(cloudEvent, topic)) { - log.debug("Is not necessary to filter the message. Is test message '{}', filterOutBeforeTopic: '{}', targetTopic: '{}'", cloudEvent.isTestMessage(), cloudEvent.getFilterOutBeforeTopic(), topic); - kafkaTemplate.send(topic, cloudEvent); - } else { - log.info("Filter out test message: '{}' to topic: '{}'", cloudEvent, kafkaConfig.getTestMessageOutputTopic()); - kafkaTemplate.send(kafkaConfig.getTestMessageOutputTopic(), cloudEvent); - } - } catch (Exception e) { - throw new MicoCloudEventException("An error occurred while sending the cloud event.", e, cloudEvent); - } - } - - /** - * This method checks if it is necessary to filter it out. This only works - * for testMessages. It returns {@code true} if {@code isTestMessage} is - * true and the param {@code topic} equals {@code filterOutBeforeTopic}. - * - * @param topic The target topic or destination of this message. - * @return isTestMessage && topic.equals(filterOutBeforeTopic) - */ - public boolean isTestMessageCompleted(MicoCloudEventImpl cloudEvent, String topic) { - return cloudEvent.isTestMessage().orElse(false) && topic.equals(cloudEvent.getFilterOutBeforeTopic().orElse(null)); - } - - /** - * Sets the time, the correlationId and the Id field of a CloudEvent message if missing - * - * @param cloudEvent the cloud event to send - * @param originalMessageId the id of the original message - */ - public void setMissingHeaderFields(MicoCloudEventImpl cloudEvent, String originalMessageId) { - // Add random id if missing - if (StringUtils.isEmpty(cloudEvent.getId())) { - cloudEvent.setRandomId(); - log.debug("Added missing id '{}' to cloud event", cloudEvent.getId()); - } - // Add time if missing - if (!cloudEvent.getTime().isPresent()) { - cloudEvent.setTime(ZonedDateTime.now()); - log.debug("Added missing time '{}' to cloud event", cloudEvent.getTime().orElse(null)); - } - if (!StringUtils.isEmpty(originalMessageId)) { - // Add correlation id if missing - if (!cloudEvent.getCorrelationId().isPresent()) { - cloudEvent.setCorrelationId(originalMessageId); - } - // Set 'created from' to the original message id only if necessary - if (!cloudEvent.getId().equals(originalMessageId)) { - if (!cloudEvent.isErrorMessage().orElse(false) || - (cloudEvent.isErrorMessage().orElse(false) && StringUtils.isEmpty(cloudEvent.getCreatedFrom().orElse("")))) { - cloudEvent.setCreatedFrom(originalMessageId); - } - } - } - // Add source if it is an error message, e.g.: kafka://mico/transform-request - if (cloudEvent.isErrorMessage().orElse(false)) { - try { - URI source = new URI("kafka://" + this.kafkaConfig.getGroupId() + "/" + this.kafkaConfig.getInputTopic()); - cloudEvent.setSource(source); - } catch (URISyntaxException e) { - log.error("Could not construct a valid source attribute for the error message. " + - "Caused by: {}", e.getMessage()); - } - } - } } diff --git a/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java b/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java index 13aca75..93953e8 100644 --- a/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java +++ b/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java @@ -77,6 +77,9 @@ public class MessageListenerTests { @Autowired MessageListener messageListener; + @Autowired + KafkaMessageSender kafkaMessageSender; + private MicoKafkaTestHelper micoKafkaTestHelper; @PostConstruct @@ -222,7 +225,7 @@ public void testFilterOutCheck() { cloudEventSimple.setFilterOutBeforeTopic(testFilterTopic); cloudEventSimple.setIsTestMessage(true); - assertTrue("The message should be filtered out", messageListener.isTestMessageCompleted(cloudEventSimple, testFilterTopic)); + assertTrue("The message should be filtered out", kafkaMessageSender.isTestMessageCompleted(cloudEventSimple, testFilterTopic)); } /** @@ -234,7 +237,7 @@ public void testNotFilterOutCheck() { String testFilterTopic = "TestFilterTopic"; cloudEventSimple.setFilterOutBeforeTopic(testFilterTopic); - assertFalse("The message not should be filtered out, because it is not a test message", messageListener.isTestMessageCompleted(cloudEventSimple, testFilterTopic)); + assertFalse("The message not should be filtered out, because it is not a test message", kafkaMessageSender.isTestMessageCompleted(cloudEventSimple, testFilterTopic)); } /** @@ -247,7 +250,7 @@ public void testNotFilterOutCheckDifferentTopics() { cloudEventSimple.setFilterOutBeforeTopic(testFilterTopic); cloudEventSimple.setIsTestMessage(true); - assertFalse("The message not should be filtered out, because it has not reached the filter out topic", messageListener.isTestMessageCompleted(cloudEventSimple, testFilterTopic + "Difference")); + assertFalse("The message not should be filtered out, because it has not reached the filter out topic", kafkaMessageSender.isTestMessageCompleted(cloudEventSimple, testFilterTopic + "Difference")); } /** @@ -376,7 +379,7 @@ public void testCorrelationIdUnChanged() { public void testCreatedFrom() { MicoCloudEventImpl cloudEventSimple = CloudEventTestUtils.basicCloudEventWithRandomId(); final String originalMessageId = "OriginalMessageId"; - messageListener.setMissingHeaderFields(cloudEventSimple, originalMessageId); + kafkaMessageSender.setMissingHeaderFields(cloudEventSimple, originalMessageId); assertThat("If the id changes the createdFrom attribute has to be set", cloudEventSimple.getCreatedFrom().orElse(null), is(originalMessageId)); } @@ -386,7 +389,7 @@ public void testCreatedFrom() { @Test public void testNotCreatedFrom() { MicoCloudEventImpl cloudEventSimple = CloudEventTestUtils.basicCloudEventWithRandomId(); - messageListener.setMissingHeaderFields(cloudEventSimple, cloudEventSimple.getId()); + kafkaMessageSender.setMissingHeaderFields(cloudEventSimple, cloudEventSimple.getId()); assertThat("If the id stays the same the createdFrom attribute must be empty", cloudEventSimple.getCreatedFrom().orElse(null), is(nullValue())); } From bc924be9b84cfafc93f66a593c572675dd949030 Mon Sep 17 00:00:00 2001 From: wagnerdk <18245675+wagnerdk@users.noreply.github.com> Date: Thu, 25 Jul 2019 15:54:04 +0200 Subject: [PATCH 02/13] Move message manipulation methods into a new class As described by https://github.com/UST-MICO/mico/issues/762 we want to split up our logic. This moves the message manipulation related methods from MessageListener and KafkaMessageSender to CloudEventManipulator. --- .../CloudEventManipulator.java | 110 ++++++++++++++++++ .../KafkaMessageSender.java | 81 +------------ .../kafkafaasconnector/MessageListener.java | 21 +--- 3 files changed, 119 insertions(+), 93 deletions(-) create mode 100644 src/main/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulator.java diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulator.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulator.java new file mode 100644 index 0000000..abf15c3 --- /dev/null +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulator.java @@ -0,0 +1,110 @@ +package io.github.ust.mico.kafkafaasconnector; + + +import com.fasterxml.jackson.databind.JsonNode; +import io.github.ust.mico.kafkafaasconnector.configuration.KafkaConfig; +import io.github.ust.mico.kafkafaasconnector.kafka.MicoCloudEventImpl; +import io.github.ust.mico.kafkafaasconnector.kafka.RouteHistory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.List; + +@Slf4j +@Service +public class CloudEventManipulator { + + protected static final String ROUTE_HISTORY_TYPE_TOPIC = "topic"; + protected static final String ROUTE_HISTORY_TYPE_FAAS_FUNCTION = "faas-function"; + + @Autowired + private KafkaConfig kafkaConfig; + + /** + * Add a topic routing step to the routing history of the cloud event. + * + * @param cloudEvent the cloud event to update + * @param topic the next topic the event will be sent to + * @return the updated cloud event + */ + public MicoCloudEventImpl updateRouteHistoryWithTopic(MicoCloudEventImpl cloudEvent, String topic) { + return this.updateRouteHistory(cloudEvent, topic, ROUTE_HISTORY_TYPE_TOPIC); + } + + /** + * Update the routing history in the `route` header field of the cloud event. + * + * @param cloudEvent the cloud event to update + * @param id the string id of the next routing step the message will take + * @param type the type of the routing step ("topic" or "faas-function") + * @return the updated cloud event + */ + public MicoCloudEventImpl updateRouteHistory(MicoCloudEventImpl cloudEvent, String id, String type) { + RouteHistory routingStep = new RouteHistory(type, id, ZonedDateTime.now()); + List history = cloudEvent.getRoute().map(route -> new ArrayList<>(route)).orElse(new ArrayList<>()); + history.add(routingStep); + return new MicoCloudEventImpl(cloudEvent).setRoute(history); + } + + /** + * Add a function call routing step to the routing history of the cloud event. + * + * @param cloudEvent the cloud event to update + * @param functionId the id of the function applied to the cloud event next + * @return the updated cloud event + */ + public MicoCloudEventImpl updateRouteHistoryWithFunctionCall(MicoCloudEventImpl cloudEvent, String functionId) { + return this.updateRouteHistory(cloudEvent, functionId, ROUTE_HISTORY_TYPE_FAAS_FUNCTION); + } + + + /** + * Sets the time, the correlationId and the Id field of a CloudEvent message if missing + * + * @param cloudEvent the cloud event to send + * @param originalMessageId the id of the original message + */ + public void setMissingHeaderFields(MicoCloudEventImpl cloudEvent, String originalMessageId) { + // Add random id if missing + if (StringUtils.isEmpty(cloudEvent.getId())) { + cloudEvent.setRandomId(); + log.debug("Added missing id '{}' to cloud event", cloudEvent.getId()); + } + // Add time if missing + if (!cloudEvent.getTime().isPresent()) { + cloudEvent.setTime(ZonedDateTime.now()); + log.debug("Added missing time '{}' to cloud event", cloudEvent.getTime().orElse(null)); + } + if (!StringUtils.isEmpty(originalMessageId)) { + // Add correlation id if missing + if (!cloudEvent.getCorrelationId().isPresent()) { + cloudEvent.setCorrelationId(originalMessageId); + } + // Set 'created from' to the original message id only if necessary + if (!cloudEvent.getId().equals(originalMessageId)) { + if (!cloudEvent.isErrorMessage().orElse(false) || + (cloudEvent.isErrorMessage().orElse(false) && StringUtils.isEmpty(cloudEvent.getCreatedFrom().orElse("")))) { + cloudEvent.setCreatedFrom(originalMessageId); + } + } + } + // Add source if it is an error message, e.g.: kafka://mico/transform-request + if (cloudEvent.isErrorMessage().orElse(false)) { + try { + URI source = new URI("kafka://" + this.kafkaConfig.getGroupId() + "/" + this.kafkaConfig.getInputTopic()); + cloudEvent.setSource(source); + } catch (URISyntaxException e) { + log.error("Could not construct a valid source attribute for the error message. " + + "Caused by: {}", e.getMessage()); + } + } + } + + +} diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSender.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSender.java index c729f4c..356c430 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSender.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSender.java @@ -5,16 +5,10 @@ import io.github.ust.mico.kafkafaasconnector.exception.BatchMicoCloudEventException; import io.github.ust.mico.kafkafaasconnector.exception.MicoCloudEventException; import io.github.ust.mico.kafkafaasconnector.kafka.MicoCloudEventImpl; -import io.github.ust.mico.kafkafaasconnector.kafka.RouteHistory; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; -import org.springframework.util.StringUtils; - -import java.net.URI; -import java.net.URISyntaxException; -import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -28,6 +22,9 @@ public class KafkaMessageSender { @Autowired private KafkaConfig kafkaConfig; + @Autowired + private CloudEventManipulator cloudEventManipulator; + /** * Send a cloud event using the sendCloudEvent method. @@ -81,9 +78,9 @@ public void safeSendErrorMessage(MicoCloudEventImpl cloudEvent, String */ private void sendCloudEvent(MicoCloudEventImpl cloudEvent, String topic, String originalMessageId) throws MicoCloudEventException { try { - cloudEvent = this.updateRouteHistoryWithTopic(cloudEvent, topic); + cloudEvent = cloudEventManipulator.updateRouteHistoryWithTopic(cloudEvent, topic); // TODO commit logic/transactions - setMissingHeaderFields(cloudEvent, originalMessageId); + cloudEventManipulator.setMissingHeaderFields(cloudEvent, originalMessageId); if (!isTestMessageCompleted(cloudEvent, topic)) { log.debug("Is not necessary to filter the message. Is test message '{}', filterOutBeforeTopic: '{}', targetTopic: '{}'", cloudEvent.isTestMessage(), cloudEvent.getFilterOutBeforeTopic(), topic); kafkaTemplate.send(topic, cloudEvent); @@ -108,74 +105,6 @@ public boolean isTestMessageCompleted(MicoCloudEventImpl cloudEvent, S return cloudEvent.isTestMessage().orElse(false) && topic.equals(cloudEvent.getFilterOutBeforeTopic().orElse(null)); } - /** - * Add a topic routing step to the routing history of the cloud event. - * - * @param cloudEvent the cloud event to update - * @param topic the next topic the event will be sent to - * @return the updated cloud event - */ - private MicoCloudEventImpl updateRouteHistoryWithTopic(MicoCloudEventImpl cloudEvent, String topic) { - return this.updateRouteHistory(cloudEvent, topic, "topic"); - } - - /** - * Update the routing history in the `route` header field of the cloud event. - * - * @param cloudEvent the cloud event to update - * @param id the string id of the next routing step the message will take - * @param type the type of the routing step ("topic" or "faas-function") - * @return the updated cloud event - */ - public MicoCloudEventImpl updateRouteHistory(MicoCloudEventImpl cloudEvent, String id, String type) { - RouteHistory routingStep = new RouteHistory(type, id, ZonedDateTime.now()); - List history = cloudEvent.getRoute().map(route -> new ArrayList<>(route)).orElse(new ArrayList<>()); - history.add(routingStep); - return new MicoCloudEventImpl(cloudEvent).setRoute(history); - } - - /** - * Sets the time, the correlationId and the Id field of a CloudEvent message if missing - * - * @param cloudEvent the cloud event to send - * @param originalMessageId the id of the original message - */ - public void setMissingHeaderFields(MicoCloudEventImpl cloudEvent, String originalMessageId) { - // Add random id if missing - if (StringUtils.isEmpty(cloudEvent.getId())) { - cloudEvent.setRandomId(); - log.debug("Added missing id '{}' to cloud event", cloudEvent.getId()); - } - // Add time if missing - if (!cloudEvent.getTime().isPresent()) { - cloudEvent.setTime(ZonedDateTime.now()); - log.debug("Added missing time '{}' to cloud event", cloudEvent.getTime().orElse(null)); - } - if (!StringUtils.isEmpty(originalMessageId)) { - // Add correlation id if missing - if (!cloudEvent.getCorrelationId().isPresent()) { - cloudEvent.setCorrelationId(originalMessageId); - } - // Set 'created from' to the original message id only if necessary - if (!cloudEvent.getId().equals(originalMessageId)) { - if (!cloudEvent.isErrorMessage().orElse(false) || - (cloudEvent.isErrorMessage().orElse(false) && StringUtils.isEmpty(cloudEvent.getCreatedFrom().orElse("")))) { - cloudEvent.setCreatedFrom(originalMessageId); - } - } - } - // Add source if it is an error message, e.g.: kafka://mico/transform-request - if (cloudEvent.isErrorMessage().orElse(false)) { - try { - URI source = new URI("kafka://" + this.kafkaConfig.getGroupId() + "/" + this.kafkaConfig.getInputTopic()); - cloudEvent.setSource(source); - } catch (URISyntaxException e) { - log.error("Could not construct a valid source attribute for the error message. " + - "Caused by: {}", e.getMessage()); - } - } - } - /** * Send cloud event to default topic or topic(s) next in the routingSlip. diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java index a1c0061..56fa3a1 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java @@ -50,8 +50,6 @@ @Component public class MessageListener { - protected static final String CONTENT_TYPE = "application/json"; - @Autowired private RestTemplate restTemplate; @@ -64,6 +62,9 @@ public class MessageListener { @Autowired private KafkaMessageSender kafkaMessageSender; + @Autowired + private CloudEventManipulator cloudEventManipulator; + /** * Entry point for incoming messages from kafka. * @@ -100,20 +101,6 @@ public void receive(MicoCloudEventImpl cloudEvent) { } } - - - /** - * Add a function call routing step to the routing history of the cloud event. - * - * @param cloudEvent the cloud event to update - * @param functionId the id of the function applied to the cloud event next - * @return the updated cloud event - */ - public MicoCloudEventImpl updateRouteHistoryWithFunctionCall(MicoCloudEventImpl cloudEvent, String functionId) { - return kafkaMessageSender.updateRouteHistory(cloudEvent, functionId, "faas-function"); - } - - /** * Synchronously call the configured openFaaS function. * @@ -124,7 +111,7 @@ public String callFaasFunction(MicoCloudEventImpl cloudEvent) throws M try { URL functionUrl = openFaaSConfig.getFunctionUrl(); log.debug("Start request to function '{}'", functionUrl.toString()); - String cloudEventSerialized = Json.encode(this.updateRouteHistoryWithFunctionCall(cloudEvent, openFaaSConfig.getFunctionName())); + String cloudEventSerialized = Json.encode(cloudEventManipulator.updateRouteHistoryWithFunctionCall(cloudEvent, openFaaSConfig.getFunctionName())); log.debug("Serialized cloud event: {}", cloudEventSerialized); String result = restTemplate.postForObject(functionUrl.toString(), cloudEventSerialized, String.class); log.debug("Faas call resulted in: '{}'", result); From 5de3a5b25fa8547b4c2fffc920a558701449b64c Mon Sep 17 00:00:00 2001 From: wagnerdk <18245675+wagnerdk@users.noreply.github.com> Date: Thu, 25 Jul 2019 16:31:49 +0200 Subject: [PATCH 03/13] Refactor CloudEventManipulator --- .../CloudEventManipulator.java | 104 ++++++++++++------ .../MessageListenerTests.java | 7 +- 2 files changed, 78 insertions(+), 33 deletions(-) diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulator.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulator.java index abf15c3..0ac2ae0 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulator.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulator.java @@ -20,8 +20,8 @@ @Service public class CloudEventManipulator { - protected static final String ROUTE_HISTORY_TYPE_TOPIC = "topic"; - protected static final String ROUTE_HISTORY_TYPE_FAAS_FUNCTION = "faas-function"; + private static final String ROUTE_HISTORY_TYPE_TOPIC = "topic"; + private static final String ROUTE_HISTORY_TYPE_FAAS_FUNCTION = "faas-function"; @Autowired private KafkaConfig kafkaConfig; @@ -47,9 +47,9 @@ public MicoCloudEventImpl updateRouteHistoryWithTopic(MicoCloudEventIm */ public MicoCloudEventImpl updateRouteHistory(MicoCloudEventImpl cloudEvent, String id, String type) { RouteHistory routingStep = new RouteHistory(type, id, ZonedDateTime.now()); - List history = cloudEvent.getRoute().map(route -> new ArrayList<>(route)).orElse(new ArrayList<>()); + List history = cloudEvent.getRoute().map(ArrayList::new).orElse(new ArrayList<>()); history.add(routingStep); - return new MicoCloudEventImpl(cloudEvent).setRoute(history); + return new MicoCloudEventImpl<>(cloudEvent).setRoute(history); } /** @@ -71,40 +71,82 @@ public MicoCloudEventImpl updateRouteHistoryWithFunctionCall(MicoCloud * @param originalMessageId the id of the original message */ public void setMissingHeaderFields(MicoCloudEventImpl cloudEvent, String originalMessageId) { - // Add random id if missing - if (StringUtils.isEmpty(cloudEvent.getId())) { - cloudEvent.setRandomId(); - log.debug("Added missing id '{}' to cloud event", cloudEvent.getId()); - } - // Add time if missing - if (!cloudEvent.getTime().isPresent()) { - cloudEvent.setTime(ZonedDateTime.now()); - log.debug("Added missing time '{}' to cloud event", cloudEvent.getTime().orElse(null)); - } + + setMissingId(cloudEvent); + addMissingTime(cloudEvent); + if (!StringUtils.isEmpty(originalMessageId)) { - // Add correlation id if missing - if (!cloudEvent.getCorrelationId().isPresent()) { - cloudEvent.setCorrelationId(originalMessageId); - } - // Set 'created from' to the original message id only if necessary - if (!cloudEvent.getId().equals(originalMessageId)) { - if (!cloudEvent.isErrorMessage().orElse(false) || - (cloudEvent.isErrorMessage().orElse(false) && StringUtils.isEmpty(cloudEvent.getCreatedFrom().orElse("")))) { - cloudEvent.setCreatedFrom(originalMessageId); - } - } + setMissingCorrelationId(cloudEvent, originalMessageId); + setMissingCreatedFrom(cloudEvent, originalMessageId); } + // Add source if it is an error message, e.g.: kafka://mico/transform-request if (cloudEvent.isErrorMessage().orElse(false)) { - try { - URI source = new URI("kafka://" + this.kafkaConfig.getGroupId() + "/" + this.kafkaConfig.getInputTopic()); - cloudEvent.setSource(source); - } catch (URISyntaxException e) { - log.error("Could not construct a valid source attribute for the error message. " + - "Caused by: {}", e.getMessage()); + setMissingSource(cloudEvent); + } + } + + /** + * Sets the source field of an cloud event message to "kafka://{groupId}/{inputTopic}". + * @param cloudEvent + */ + private void setMissingSource(MicoCloudEventImpl cloudEvent) { + try { + URI source = new URI("kafka://" + this.kafkaConfig.getGroupId() + "/" + this.kafkaConfig.getInputTopic()); + cloudEvent.setSource(source); + } catch (URISyntaxException e) { + log.error("Could not construct a valid source attribute for the error message. " + + "Caused by: {}", e.getMessage()); + } + } + + /** + * Adds the createdFrom field to the message if the messageId is different from the originalMessageId and + * the createdFrom field is empty. + * @param cloudEvent + * @param originalMessageId + */ + private void setMissingCreatedFrom(MicoCloudEventImpl cloudEvent, String originalMessageId) { + if (!cloudEvent.getId().equals(originalMessageId)) { + if (!cloudEvent.isErrorMessage().orElse(false) || + (cloudEvent.isErrorMessage().orElse(false) && StringUtils.isEmpty(cloudEvent.getCreatedFrom().orElse("")))) { + cloudEvent.setCreatedFrom(originalMessageId); } } } + /** + * Sets the message correlationId to the originalMessageId if the correlationId is missing + * @param cloudEvent + * @param originalMessageId + */ + private void setMissingCorrelationId(MicoCloudEventImpl cloudEvent, String originalMessageId) { + if (!cloudEvent.getCorrelationId().isPresent()) { + cloudEvent.setCorrelationId(originalMessageId); + } + } + + /** + * Adds the required field 'time' if it is missing. + * @param cloudEvent + */ + private void addMissingTime(MicoCloudEventImpl cloudEvent) { + if (!cloudEvent.getTime().isPresent()) { + cloudEvent.setTime(ZonedDateTime.now()); + log.debug("Added missing time '{}' to cloud event", cloudEvent.getTime().orElse(null)); + } + } + + /** + * Sets a missing message id to a randomly generated one. + * @param cloudEvent + */ + private void setMissingId(MicoCloudEventImpl cloudEvent) { + if (StringUtils.isEmpty(cloudEvent.getId())) { + cloudEvent.setRandomId(); + log.debug("Added missing id '{}' to cloud event", cloudEvent.getId()); + } + } + } diff --git a/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java b/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java index 93953e8..80f7d31 100644 --- a/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java +++ b/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java @@ -80,6 +80,9 @@ public class MessageListenerTests { @Autowired KafkaMessageSender kafkaMessageSender; + @Autowired + CloudEventManipulator cloudEventManipulator; + private MicoKafkaTestHelper micoKafkaTestHelper; @PostConstruct @@ -379,7 +382,7 @@ public void testCorrelationIdUnChanged() { public void testCreatedFrom() { MicoCloudEventImpl cloudEventSimple = CloudEventTestUtils.basicCloudEventWithRandomId(); final String originalMessageId = "OriginalMessageId"; - kafkaMessageSender.setMissingHeaderFields(cloudEventSimple, originalMessageId); + cloudEventManipulator.setMissingHeaderFields(cloudEventSimple, originalMessageId); assertThat("If the id changes the createdFrom attribute has to be set", cloudEventSimple.getCreatedFrom().orElse(null), is(originalMessageId)); } @@ -389,7 +392,7 @@ public void testCreatedFrom() { @Test public void testNotCreatedFrom() { MicoCloudEventImpl cloudEventSimple = CloudEventTestUtils.basicCloudEventWithRandomId(); - kafkaMessageSender.setMissingHeaderFields(cloudEventSimple, cloudEventSimple.getId()); + cloudEventManipulator.setMissingHeaderFields(cloudEventSimple, cloudEventSimple.getId()); assertThat("If the id stays the same the createdFrom attribute must be empty", cloudEventSimple.getCreatedFrom().orElse(null), is(nullValue())); } From 0b633223de965a414560067341c4356ebacf91a2 Mon Sep 17 00:00:00 2001 From: wagnerdk <18245675+wagnerdk@users.noreply.github.com> Date: Thu, 25 Jul 2019 17:05:56 +0200 Subject: [PATCH 04/13] Move FaaS related methods into new FaasController As described by https://github.com/UST-MICO/mico/issues/762 we want to split up the message receiving and the FaaS logic. --- .../kafkafaasconnector/FaasController.java | 80 +++++++++++++++++++ .../kafkafaasconnector/MessageListener.java | 72 +---------------- .../MessageListenerTests.java | 7 +- 3 files changed, 88 insertions(+), 71 deletions(-) create mode 100644 src/main/java/io/github/ust/mico/kafkafaasconnector/FaasController.java diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/FaasController.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/FaasController.java new file mode 100644 index 0000000..4b7aca9 --- /dev/null +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/FaasController.java @@ -0,0 +1,80 @@ +package io.github.ust.mico.kafkafaasconnector; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import io.cloudevents.json.Json; +import io.github.ust.mico.kafkafaasconnector.configuration.OpenFaaSConfig; +import io.github.ust.mico.kafkafaasconnector.exception.MicoCloudEventException; +import io.github.ust.mico.kafkafaasconnector.kafka.MicoCloudEventImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.web.client.HttpStatusCodeException; +import org.springframework.web.client.RestTemplate; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +@Slf4j +@Service +public class FaasController { + + @Autowired + private CloudEventManipulator cloudEventManipulator; + + @Autowired + private OpenFaaSConfig openFaaSConfig; + + @Autowired + private RestTemplate restTemplate; + + /** + * Synchronously call the configured openFaaS function. + * + * @param cloudEvent the cloud event used as parameter for the function + * @return the result of the function call (in serialized form) + */ + public List> callFaasFunction(MicoCloudEventImpl cloudEvent) throws MicoCloudEventException { + if (!this.openFaaSConfig.isSkipFunctionCall()) { + try { + URL functionUrl = openFaaSConfig.getFunctionUrl(); + log.debug("Start request to function '{}'", functionUrl.toString()); + String cloudEventSerialized = Json.encode(cloudEventManipulator.updateRouteHistoryWithFunctionCall(cloudEvent, openFaaSConfig.getFunctionName())); + log.debug("Serialized cloud event: {}", cloudEventSerialized); + String result = restTemplate.postForObject(functionUrl.toString(), cloudEventSerialized, String.class); + log.debug("Faas call resulted in: '{}'", result); + return parseFunctionResult(result, cloudEvent); + } catch (MalformedURLException e) { + throw new MicoCloudEventException("Failed to call faas-function. Caused by: " + e.getMessage(), cloudEvent); + } catch (IllegalStateException e) { + log.error("Failed to serialize CloudEvent '{}'.", cloudEvent); + throw new MicoCloudEventException("Failed to serialize CloudEvent while calling the faas-function.", cloudEvent); + } catch (HttpStatusCodeException e) { + log.error("A client error occurred with http status:{} . These exceptions are triggered if the FaaS function does not return 200 OK as the status code", e.getStatusCode(), e); + throw new MicoCloudEventException(e.toString(), cloudEvent); + } + } else { + return Collections.singletonList(cloudEvent); + } + } + + /** + * Parse the result of a faas function call. + * + * @param sourceCloudEvent only used for better error messages + * @return an ArrayList of cloud events + */ + public ArrayList> parseFunctionResult(String functionResult, MicoCloudEventImpl sourceCloudEvent) throws MicoCloudEventException { + try { + return Json.decodeValue(functionResult, new TypeReference>>() { + }); + } catch (IllegalStateException e) { + log.error("Failed to parse JSON from response '{}'.", functionResult); + throw new MicoCloudEventException("Failed to parse JSON from response from the faas-function.", sourceCloudEvent); + } + } + +} diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java index 56fa3a1..2e44c50 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java @@ -19,51 +19,31 @@ package io.github.ust.mico.kafkafaasconnector; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; -import io.cloudevents.json.Json; import io.github.ust.mico.kafkafaasconnector.configuration.KafkaConfig; -import io.github.ust.mico.kafkafaasconnector.configuration.OpenFaaSConfig; -import io.github.ust.mico.kafkafaasconnector.exception.BatchMicoCloudEventException; import io.github.ust.mico.kafkafaasconnector.exception.MicoCloudEventException; import io.github.ust.mico.kafkafaasconnector.kafka.MicoCloudEventImpl; -import io.github.ust.mico.kafkafaasconnector.kafka.RouteHistory; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; -import org.springframework.web.client.HttpStatusCodeException; -import org.springframework.web.client.RestTemplate; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; import java.time.ZonedDateTime; -import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; @Slf4j @Component public class MessageListener { - @Autowired - private RestTemplate restTemplate; - @Autowired private KafkaConfig kafkaConfig; - @Autowired - private OpenFaaSConfig openFaaSConfig; - @Autowired private KafkaMessageSender kafkaMessageSender; @Autowired - private CloudEventManipulator cloudEventManipulator; + private FaasController faasController; + /** * Entry point for incoming messages from kafka. @@ -85,12 +65,8 @@ public void receive(MicoCloudEventImpl cloudEvent) { if (cloudEvent.getExpiryDate().map(exp -> exp.compareTo(ZonedDateTime.now()) < 0).orElse(false)) { log.debug("Received expired message!"); throw new MicoCloudEventException("CloudEvent has already expired!", cloudEvent); - } else if (this.openFaaSConfig.isSkipFunctionCall()) { - // when skipping the openFaaS function just pass on the original cloudEvent - kafkaMessageSender.safeSendCloudEvent(cloudEvent, originalMessageId); } else { - String functionResult = callFaasFunction(cloudEvent); - ArrayList> events = parseFunctionResult(functionResult, cloudEvent); + List> events = faasController.callFaasFunction(cloudEvent); for (MicoCloudEventImpl event : events) { // individually wrap each cloud event in the results for sending kafkaMessageSender.safeSendCloudEvent(event, originalMessageId); @@ -101,46 +77,4 @@ public void receive(MicoCloudEventImpl cloudEvent) { } } - /** - * Synchronously call the configured openFaaS function. - * - * @param cloudEvent the cloud event used as parameter for the function - * @return the result of the function call (in serialized form) - */ - public String callFaasFunction(MicoCloudEventImpl cloudEvent) throws MicoCloudEventException { - try { - URL functionUrl = openFaaSConfig.getFunctionUrl(); - log.debug("Start request to function '{}'", functionUrl.toString()); - String cloudEventSerialized = Json.encode(cloudEventManipulator.updateRouteHistoryWithFunctionCall(cloudEvent, openFaaSConfig.getFunctionName())); - log.debug("Serialized cloud event: {}", cloudEventSerialized); - String result = restTemplate.postForObject(functionUrl.toString(), cloudEventSerialized, String.class); - log.debug("Faas call resulted in: '{}'", result); - return result; - } catch (MalformedURLException e) { - throw new MicoCloudEventException("Failed to call faas-function. Caused by: " + e.getMessage(), cloudEvent); - } catch (IllegalStateException e) { - log.error("Failed to serialize CloudEvent '{}'.", cloudEvent); - throw new MicoCloudEventException("Failed to serialize CloudEvent while calling the faas-function.", cloudEvent); - } catch (HttpStatusCodeException e) { - log.error("A client error occurred with http status:{} . These exceptions are triggered if the FaaS function does not return 200 OK as the status code", e.getStatusCode(), e); - throw new MicoCloudEventException(e.toString(), cloudEvent); - } - } - - /** - * Parse the result of a faas function call. - * - * @param sourceCloudEvent only used for better error messages - * @return an ArrayList of cloud events - */ - public ArrayList> parseFunctionResult(String functionResult, MicoCloudEventImpl sourceCloudEvent) throws MicoCloudEventException { - try { - return Json.decodeValue(functionResult, new TypeReference>>() { - }); - } catch (IllegalStateException e) { - log.error("Failed to parse JSON from response '{}'.", functionResult); - throw new MicoCloudEventException("Failed to parse JSON from response from the faas-function.", sourceCloudEvent); - } - } - } diff --git a/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java b/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java index 80f7d31..5934283 100644 --- a/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java +++ b/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java @@ -77,6 +77,9 @@ public class MessageListenerTests { @Autowired MessageListener messageListener; + @Autowired + FaasController faasController; + @Autowired KafkaMessageSender kafkaMessageSender; @@ -100,7 +103,7 @@ public void before() { @Test public void parseEmptyFunctionResult() throws MicoCloudEventException { - ArrayList> result = this.messageListener.parseFunctionResult("[]", null); + ArrayList> result = this.faasController.parseFunctionResult("[]", null); assertNotNull(result); assertEquals(0, result.size()); } @@ -113,7 +116,7 @@ public void parseFunctionResult() throws MicoCloudEventException { input.add(cloudEvent1); input.add(cloudEvent2); String functionInput = Json.encode(input); - ArrayList> result = this.messageListener.parseFunctionResult(functionInput, null); + ArrayList> result = this.faasController.parseFunctionResult(functionInput, null); assertNotNull(result); assertEquals(2, result.size()); assertEquals(result.get(0).getId(), cloudEvent1.getId()); From 9f1257194f49a9424396d6cc6a1b06ce6aec5c96 Mon Sep 17 00:00:00 2001 From: wagnerdk <18245675+wagnerdk@users.noreply.github.com> Date: Thu, 25 Jul 2019 18:10:04 +0200 Subject: [PATCH 05/13] Refactor MessageListener --- .../kafkafaasconnector/MessageListener.java | 40 ++++++++++++------- .../kafka/CloudEventDeserializer.java | 5 +++ 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java index 2e44c50..6595697 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java @@ -58,23 +58,35 @@ public void receive(MicoCloudEventImpl cloudEvent) { String originalMessageId = cloudEvent.getId(); try { - if (!cloudEvent.getData().isPresent()) { - // data is entirely optional - log.debug("Received message does not include any data!"); - } - if (cloudEvent.getExpiryDate().map(exp -> exp.compareTo(ZonedDateTime.now()) < 0).orElse(false)) { - log.debug("Received expired message!"); - throw new MicoCloudEventException("CloudEvent has already expired!", cloudEvent); - } else { - List> events = faasController.callFaasFunction(cloudEvent); - for (MicoCloudEventImpl event : events) { - // individually wrap each cloud event in the results for sending - kafkaMessageSender.safeSendCloudEvent(event, originalMessageId); - } - } + handleExpiredMessage(cloudEvent); + + List> events = faasController.callFaasFunction(cloudEvent); + events.forEach(event-> kafkaMessageSender.safeSendCloudEvent(event, originalMessageId)); + } catch (MicoCloudEventException e) { kafkaMessageSender.safeSendErrorMessage(e.getErrorEvent(), this.kafkaConfig.getInvalidMessageTopic(), originalMessageId); } } + /** + * Logs expired messages and throw a {@code MicoCloudEventException} if the message is expired. + * @param cloudEvent + * @throws MicoCloudEventException + */ + private void handleExpiredMessage(MicoCloudEventImpl cloudEvent) throws MicoCloudEventException { + if (isMessageExpired(cloudEvent)) { + log.debug("Received expired message!"); + throw new MicoCloudEventException("CloudEvent has already expired!", cloudEvent); + } + } + + /** + * Checks if the message is expired + * @param cloudEvent + * @return + */ + private boolean isMessageExpired(MicoCloudEventImpl cloudEvent) { + return cloudEvent.getExpiryDate().map(exp -> exp.compareTo(ZonedDateTime.now()) < 0).orElse(false); + } + } diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/kafka/CloudEventDeserializer.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/kafka/CloudEventDeserializer.java index f8208c2..b898703 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/kafka/CloudEventDeserializer.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/kafka/CloudEventDeserializer.java @@ -47,6 +47,11 @@ public MicoCloudEventImpl deserialize(String topic, byte[] data) { MicoCloudEventImpl micoCloudEvent = Json.decodeValue(message, new TypeReference>() { }); log.debug("Deserialized micoCloudEvent '{}' on topic: '{}'",micoCloudEvent.toString(),topic); + + if (!micoCloudEvent.getData().isPresent()) { + // data is entirely optional + log.debug("Received message does not include any data!"); + } return micoCloudEvent; } catch (IllegalStateException e) { throw new SerializationException("Could not create an CloudEvent message", e); From 10d19d347ffb08ee3b5c51425e84d9f8ed1a3912 Mon Sep 17 00:00:00 2001 From: wagnerdk <18245675+wagnerdk@users.noreply.github.com> Date: Thu, 25 Jul 2019 18:27:21 +0200 Subject: [PATCH 06/13] Move message processing related classes into a new package --- .../io/github/ust/mico/kafkafaasconnector/MessageListener.java | 2 ++ .../{ => MessageProcessing}/CloudEventManipulator.java | 2 +- .../{ => MessageProcessing}/FaasController.java | 2 +- .../{ => MessageProcessing}/KafkaMessageSender.java | 2 +- .../ust/mico/kafkafaasconnector/MessageListenerTests.java | 3 +++ 5 files changed, 8 insertions(+), 3 deletions(-) rename src/main/java/io/github/ust/mico/kafkafaasconnector/{ => MessageProcessing}/CloudEventManipulator.java (98%) rename src/main/java/io/github/ust/mico/kafkafaasconnector/{ => MessageProcessing}/FaasController.java (98%) rename src/main/java/io/github/ust/mico/kafkafaasconnector/{ => MessageProcessing}/KafkaMessageSender.java (98%) diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java index 6595697..83ffa6c 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java @@ -20,6 +20,8 @@ package io.github.ust.mico.kafkafaasconnector; import com.fasterxml.jackson.databind.JsonNode; +import io.github.ust.mico.kafkafaasconnector.MessageProcessing.FaasController; +import io.github.ust.mico.kafkafaasconnector.MessageProcessing.KafkaMessageSender; import io.github.ust.mico.kafkafaasconnector.configuration.KafkaConfig; import io.github.ust.mico.kafkafaasconnector.exception.MicoCloudEventException; import io.github.ust.mico.kafkafaasconnector.kafka.MicoCloudEventImpl; diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulator.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/CloudEventManipulator.java similarity index 98% rename from src/main/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulator.java rename to src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/CloudEventManipulator.java index 0ac2ae0..5f3e770 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulator.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/CloudEventManipulator.java @@ -1,4 +1,4 @@ -package io.github.ust.mico.kafkafaasconnector; +package io.github.ust.mico.kafkafaasconnector.MessageProcessing; import com.fasterxml.jackson.databind.JsonNode; diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/FaasController.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/FaasController.java similarity index 98% rename from src/main/java/io/github/ust/mico/kafkafaasconnector/FaasController.java rename to src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/FaasController.java index 4b7aca9..77383fa 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/FaasController.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/FaasController.java @@ -1,4 +1,4 @@ -package io.github.ust.mico.kafkafaasconnector; +package io.github.ust.mico.kafkafaasconnector.MessageProcessing; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSender.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/KafkaMessageSender.java similarity index 98% rename from src/main/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSender.java rename to src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/KafkaMessageSender.java index 356c430..803d6c4 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSender.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/KafkaMessageSender.java @@ -1,4 +1,4 @@ -package io.github.ust.mico.kafkafaasconnector; +package io.github.ust.mico.kafkafaasconnector.MessageProcessing; import com.fasterxml.jackson.databind.JsonNode; import io.github.ust.mico.kafkafaasconnector.configuration.KafkaConfig; diff --git a/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java b/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java index 5934283..15bbcb9 100644 --- a/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java +++ b/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java @@ -21,6 +21,9 @@ import com.fasterxml.jackson.databind.JsonNode; import io.cloudevents.json.Json; +import io.github.ust.mico.kafkafaasconnector.MessageProcessing.CloudEventManipulator; +import io.github.ust.mico.kafkafaasconnector.MessageProcessing.FaasController; +import io.github.ust.mico.kafkafaasconnector.MessageProcessing.KafkaMessageSender; import io.github.ust.mico.kafkafaasconnector.configuration.KafkaConfig; import io.github.ust.mico.kafkafaasconnector.exception.MicoCloudEventException; import io.github.ust.mico.kafkafaasconnector.kafka.MicoCloudEventImpl; From 614602023cdc6f53f0b001a53120d926f602829f Mon Sep 17 00:00:00 2001 From: wagnerdk <18245675+wagnerdk@users.noreply.github.com> Date: Thu, 25 Jul 2019 18:40:01 +0200 Subject: [PATCH 07/13] Add missing license --- .../CloudEventManipulator.java | 20 ++++++++++++++++++- .../MessageProcessing/FaasController.java | 19 ++++++++++++++++++ .../MessageProcessing/KafkaMessageSender.java | 19 ++++++++++++++++++ 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/CloudEventManipulator.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/CloudEventManipulator.java index 5f3e770..c56f838 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/CloudEventManipulator.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/CloudEventManipulator.java @@ -1,5 +1,23 @@ -package io.github.ust.mico.kafkafaasconnector.MessageProcessing; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.github.ust.mico.kafkafaasconnector.MessageProcessing; import com.fasterxml.jackson.databind.JsonNode; import io.github.ust.mico.kafkafaasconnector.configuration.KafkaConfig; diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/FaasController.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/FaasController.java index 77383fa..cf57a22 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/FaasController.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/FaasController.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package io.github.ust.mico.kafkafaasconnector.MessageProcessing; import com.fasterxml.jackson.core.type.TypeReference; diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/KafkaMessageSender.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/KafkaMessageSender.java index 803d6c4..fe92d60 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/KafkaMessageSender.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/KafkaMessageSender.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package io.github.ust.mico.kafkafaasconnector.MessageProcessing; import com.fasterxml.jackson.databind.JsonNode; From f531bf1ef68900eeef62e71eb6475ed492a751e9 Mon Sep 17 00:00:00 2001 From: wagnerdk <18245675+wagnerdk@users.noreply.github.com> Date: Thu, 25 Jul 2019 21:08:59 +0200 Subject: [PATCH 08/13] Add tests for message deserialization --- .../kafkafaasconnector/MessageListener.java | 2 ++ .../MessageListenerTests.java | 25 +++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java index 83ffa6c..65180c5 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java @@ -91,4 +91,6 @@ private boolean isMessageExpired(MicoCloudEventImpl cloudEvent) { return cloudEvent.getExpiryDate().map(exp -> exp.compareTo(ZonedDateTime.now()) < 0).orElse(false); } + + } diff --git a/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java b/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java index 15bbcb9..803e279 100644 --- a/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java +++ b/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java @@ -25,11 +25,14 @@ import io.github.ust.mico.kafkafaasconnector.MessageProcessing.FaasController; import io.github.ust.mico.kafkafaasconnector.MessageProcessing.KafkaMessageSender; import io.github.ust.mico.kafkafaasconnector.configuration.KafkaConfig; +import io.github.ust.mico.kafkafaasconnector.configuration.OpenFaaSConfig; import io.github.ust.mico.kafkafaasconnector.exception.MicoCloudEventException; +import io.github.ust.mico.kafkafaasconnector.kafka.CloudEventDeserializer; import io.github.ust.mico.kafkafaasconnector.kafka.MicoCloudEventImpl; import io.github.ust.mico.kafkafaasconnector.kafka.RouteHistory; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.errors.SerializationException; import org.exparity.hamcrest.date.ZonedDateTimeMatchers; import org.junit.ClassRule; import org.junit.Test; @@ -47,6 +50,7 @@ import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.PostConstruct; +import java.nio.charset.Charset; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -402,4 +406,25 @@ public void testNotCreatedFrom() { assertThat("If the id stays the same the createdFrom attribute must be empty", cloudEventSimple.getCreatedFrom().orElse(null), is(nullValue())); } + + /** + * Tests message deserialization with a broken message + */ + @Test(expected = SerializationException.class) + public void testBrokenMessageDeserialization(){ + CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer(); + String invalidMessage = "InvalidMessage"; + cloudEventDeserializer.deserialize("",invalidMessage.getBytes(Charset.defaultCharset())); + } + + /** + * Tests message serialization with a empty but not null message + */ + @Test(expected = SerializationException.class) + public void testEmptyMessageSerialization(){ + CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer(); + byte[] message = {}; + cloudEventDeserializer.deserialize("",message); + } + } From ea67f316096d7ce11dd3700b4fa20dd2946a02f2 Mon Sep 17 00:00:00 2001 From: David Kopp Date: Thu, 25 Jul 2019 22:47:24 +0200 Subject: [PATCH 09/13] Reformat files --- .../mico/kafkafaasconnector/MessageListener.java | 7 +++---- .../MessageProcessing/CloudEventManipulator.java | 7 +++++-- .../MessageProcessing/KafkaMessageSender.java | 5 ++--- .../kafka/CloudEventDeserializer.java | 2 +- .../kafkafaasconnector/MessageListenerTests.java | 15 +++++++-------- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java index 65180c5..98851e6 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java @@ -46,7 +46,6 @@ public class MessageListener { @Autowired private FaasController faasController; - /** * Entry point for incoming messages from kafka. * @@ -63,7 +62,7 @@ public void receive(MicoCloudEventImpl cloudEvent) { handleExpiredMessage(cloudEvent); List> events = faasController.callFaasFunction(cloudEvent); - events.forEach(event-> kafkaMessageSender.safeSendCloudEvent(event, originalMessageId)); + events.forEach(event -> kafkaMessageSender.safeSendCloudEvent(event, originalMessageId)); } catch (MicoCloudEventException e) { kafkaMessageSender.safeSendErrorMessage(e.getErrorEvent(), this.kafkaConfig.getInvalidMessageTopic(), originalMessageId); @@ -72,6 +71,7 @@ public void receive(MicoCloudEventImpl cloudEvent) { /** * Logs expired messages and throw a {@code MicoCloudEventException} if the message is expired. + * * @param cloudEvent * @throws MicoCloudEventException */ @@ -84,6 +84,7 @@ private void handleExpiredMessage(MicoCloudEventImpl cloudEvent) throw /** * Checks if the message is expired + * * @param cloudEvent * @return */ @@ -91,6 +92,4 @@ private boolean isMessageExpired(MicoCloudEventImpl cloudEvent) { return cloudEvent.getExpiryDate().map(exp -> exp.compareTo(ZonedDateTime.now()) < 0).orElse(false); } - - } diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/CloudEventManipulator.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/CloudEventManipulator.java index c56f838..90b0fd9 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/CloudEventManipulator.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/CloudEventManipulator.java @@ -81,7 +81,6 @@ public MicoCloudEventImpl updateRouteHistoryWithFunctionCall(MicoCloud return this.updateRouteHistory(cloudEvent, functionId, ROUTE_HISTORY_TYPE_FAAS_FUNCTION); } - /** * Sets the time, the correlationId and the Id field of a CloudEvent message if missing * @@ -106,6 +105,7 @@ public void setMissingHeaderFields(MicoCloudEventImpl cloudEvent, Stri /** * Sets the source field of an cloud event message to "kafka://{groupId}/{inputTopic}". + * * @param cloudEvent */ private void setMissingSource(MicoCloudEventImpl cloudEvent) { @@ -121,6 +121,7 @@ private void setMissingSource(MicoCloudEventImpl cloudEvent) { /** * Adds the createdFrom field to the message if the messageId is different from the originalMessageId and * the createdFrom field is empty. + * * @param cloudEvent * @param originalMessageId */ @@ -135,6 +136,7 @@ private void setMissingCreatedFrom(MicoCloudEventImpl cloudEvent, Stri /** * Sets the message correlationId to the originalMessageId if the correlationId is missing + * * @param cloudEvent * @param originalMessageId */ @@ -146,6 +148,7 @@ private void setMissingCorrelationId(MicoCloudEventImpl cloudEvent, St /** * Adds the required field 'time' if it is missing. + * * @param cloudEvent */ private void addMissingTime(MicoCloudEventImpl cloudEvent) { @@ -157,6 +160,7 @@ private void addMissingTime(MicoCloudEventImpl cloudEvent) { /** * Sets a missing message id to a randomly generated one. + * * @param cloudEvent */ private void setMissingId(MicoCloudEventImpl cloudEvent) { @@ -166,5 +170,4 @@ private void setMissingId(MicoCloudEventImpl cloudEvent) { } } - } diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/KafkaMessageSender.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/KafkaMessageSender.java index fe92d60..fb56fa1 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/KafkaMessageSender.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/KafkaMessageSender.java @@ -28,9 +28,11 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; + import java.util.ArrayList; import java.util.LinkedList; import java.util.List; + @Slf4j @Service public class KafkaMessageSender { @@ -44,7 +46,6 @@ public class KafkaMessageSender { @Autowired private CloudEventManipulator cloudEventManipulator; - /** * Send a cloud event using the sendCloudEvent method. *

@@ -124,7 +125,6 @@ public boolean isTestMessageCompleted(MicoCloudEventImpl cloudEvent, S return cloudEvent.isTestMessage().orElse(false) && topic.equals(cloudEvent.getFilterOutBeforeTopic().orElse(null)); } - /** * Send cloud event to default topic or topic(s) next in the routingSlip. * @@ -153,5 +153,4 @@ private void sendCloudEvent(MicoCloudEventImpl cloudEvent, String orig } } - } diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/kafka/CloudEventDeserializer.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/kafka/CloudEventDeserializer.java index b898703..7c74b1e 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/kafka/CloudEventDeserializer.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/kafka/CloudEventDeserializer.java @@ -46,7 +46,7 @@ public MicoCloudEventImpl deserialize(String topic, byte[] data) { log.debug("Trying to parse the message:" + message); MicoCloudEventImpl micoCloudEvent = Json.decodeValue(message, new TypeReference>() { }); - log.debug("Deserialized micoCloudEvent '{}' on topic: '{}'",micoCloudEvent.toString(),topic); + log.debug("Deserialized micoCloudEvent '{}' on topic: '{}'", micoCloudEvent.toString(), topic); if (!micoCloudEvent.getData().isPresent()) { // data is entirely optional diff --git a/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java b/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java index 803e279..e23c26e 100644 --- a/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java +++ b/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java @@ -25,7 +25,6 @@ import io.github.ust.mico.kafkafaasconnector.MessageProcessing.FaasController; import io.github.ust.mico.kafkafaasconnector.MessageProcessing.KafkaMessageSender; import io.github.ust.mico.kafkafaasconnector.configuration.KafkaConfig; -import io.github.ust.mico.kafkafaasconnector.configuration.OpenFaaSConfig; import io.github.ust.mico.kafkafaasconnector.exception.MicoCloudEventException; import io.github.ust.mico.kafkafaasconnector.kafka.CloudEventDeserializer; import io.github.ust.mico.kafkafaasconnector.kafka.MicoCloudEventImpl; @@ -99,13 +98,13 @@ public class MessageListenerTests { public void before() { this.micoKafkaTestHelper = new MicoKafkaTestHelper(embeddedKafka, kafkaConfig); template = this.micoKafkaTestHelper.getTemplate(); - //We need to add them outside of the rule because the autowired kakfaConfig is not accessible from the static rule - //We can not use @BeforeClass which is only executed once because it has to be static and we do not have access to the autowired kakfaConfig + //We need to add them outside of the rule because the autowired kafkaConfig is not accessible from the static rule + //We can not use @BeforeClass which is only executed once because it has to be static and we do not have access to the autowired kafkaConfig Set requiredTopics = this.micoKafkaTestHelper.getRequiredTopics(); Set alreadySetTopics = this.micoKafkaTestHelper.requestActuallySetTopics(); requiredTopics.removeAll(alreadySetTopics); - requiredTopics.forEach(topic -> embeddedKafka.addTopics(topic)); + requiredTopics.forEach(embeddedKafka::addTopics); } @Test @@ -411,20 +410,20 @@ public void testNotCreatedFrom() { * Tests message deserialization with a broken message */ @Test(expected = SerializationException.class) - public void testBrokenMessageDeserialization(){ + public void testBrokenMessageDeserialization() { CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer(); String invalidMessage = "InvalidMessage"; - cloudEventDeserializer.deserialize("",invalidMessage.getBytes(Charset.defaultCharset())); + cloudEventDeserializer.deserialize("", invalidMessage.getBytes(Charset.defaultCharset())); } /** * Tests message serialization with a empty but not null message */ @Test(expected = SerializationException.class) - public void testEmptyMessageSerialization(){ + public void testEmptyMessageSerialization() { CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer(); byte[] message = {}; - cloudEventDeserializer.deserialize("",message); + cloudEventDeserializer.deserialize("", message); } } From cc98dd871be6ab0af7296f4e98d2de4af53232fb Mon Sep 17 00:00:00 2001 From: David Kopp Date: Thu, 25 Jul 2019 22:47:58 +0200 Subject: [PATCH 10/13] Improve error message regarding HTTP client error --- .../kafkafaasconnector/MessageProcessing/FaasController.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/FaasController.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/FaasController.java index cf57a22..de27dee 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/FaasController.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/FaasController.java @@ -58,8 +58,9 @@ public class FaasController { */ public List> callFaasFunction(MicoCloudEventImpl cloudEvent) throws MicoCloudEventException { if (!this.openFaaSConfig.isSkipFunctionCall()) { + URL functionUrl = null; try { - URL functionUrl = openFaaSConfig.getFunctionUrl(); + functionUrl = openFaaSConfig.getFunctionUrl(); log.debug("Start request to function '{}'", functionUrl.toString()); String cloudEventSerialized = Json.encode(cloudEventManipulator.updateRouteHistoryWithFunctionCall(cloudEvent, openFaaSConfig.getFunctionName())); log.debug("Serialized cloud event: {}", cloudEventSerialized); @@ -72,7 +73,7 @@ public List> callFaasFunction(MicoCloudEventImpl Date: Sat, 27 Jul 2019 16:33:14 +0200 Subject: [PATCH 11/13] Fix package name and method naming consistency --- .../kafkafaasconnector/MessageListener.java | 4 +- .../CloudEventManipulator.java | 6 +-- .../FaasController.java | 38 +++++++++---------- .../KafkaMessageSender.java | 2 +- .../MessageListenerTests.java | 7 ++-- 5 files changed, 28 insertions(+), 29 deletions(-) rename src/main/java/io/github/ust/mico/kafkafaasconnector/{MessageProcessing => messageprocessing}/CloudEventManipulator.java (97%) rename src/main/java/io/github/ust/mico/kafkafaasconnector/{MessageProcessing => messageprocessing}/FaasController.java (67%) rename src/main/java/io/github/ust/mico/kafkafaasconnector/{MessageProcessing => messageprocessing}/KafkaMessageSender.java (99%) diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java index 65180c5..afb2f22 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java @@ -20,8 +20,8 @@ package io.github.ust.mico.kafkafaasconnector; import com.fasterxml.jackson.databind.JsonNode; -import io.github.ust.mico.kafkafaasconnector.MessageProcessing.FaasController; -import io.github.ust.mico.kafkafaasconnector.MessageProcessing.KafkaMessageSender; +import io.github.ust.mico.kafkafaasconnector.messageprocessing.FaasController; +import io.github.ust.mico.kafkafaasconnector.messageprocessing.KafkaMessageSender; import io.github.ust.mico.kafkafaasconnector.configuration.KafkaConfig; import io.github.ust.mico.kafkafaasconnector.exception.MicoCloudEventException; import io.github.ust.mico.kafkafaasconnector.kafka.MicoCloudEventImpl; diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/CloudEventManipulator.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/messageprocessing/CloudEventManipulator.java similarity index 97% rename from src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/CloudEventManipulator.java rename to src/main/java/io/github/ust/mico/kafkafaasconnector/messageprocessing/CloudEventManipulator.java index c56f838..1de706f 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/CloudEventManipulator.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/messageprocessing/CloudEventManipulator.java @@ -17,7 +17,7 @@ * under the License. */ -package io.github.ust.mico.kafkafaasconnector.MessageProcessing; +package io.github.ust.mico.kafkafaasconnector.messageprocessing; import com.fasterxml.jackson.databind.JsonNode; import io.github.ust.mico.kafkafaasconnector.configuration.KafkaConfig; @@ -91,7 +91,7 @@ public MicoCloudEventImpl updateRouteHistoryWithFunctionCall(MicoCloud public void setMissingHeaderFields(MicoCloudEventImpl cloudEvent, String originalMessageId) { setMissingId(cloudEvent); - addMissingTime(cloudEvent); + setMissingTime(cloudEvent); if (!StringUtils.isEmpty(originalMessageId)) { setMissingCorrelationId(cloudEvent, originalMessageId); @@ -148,7 +148,7 @@ private void setMissingCorrelationId(MicoCloudEventImpl cloudEvent, St * Adds the required field 'time' if it is missing. * @param cloudEvent */ - private void addMissingTime(MicoCloudEventImpl cloudEvent) { + private void setMissingTime(MicoCloudEventImpl cloudEvent) { if (!cloudEvent.getTime().isPresent()) { cloudEvent.setTime(ZonedDateTime.now()); log.debug("Added missing time '{}' to cloud event", cloudEvent.getTime().orElse(null)); diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/FaasController.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/messageprocessing/FaasController.java similarity index 67% rename from src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/FaasController.java rename to src/main/java/io/github/ust/mico/kafkafaasconnector/messageprocessing/FaasController.java index cf57a22..48d1e86 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/FaasController.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/messageprocessing/FaasController.java @@ -17,7 +17,7 @@ * under the License. */ -package io.github.ust.mico.kafkafaasconnector.MessageProcessing; +package io.github.ust.mico.kafkafaasconnector.messageprocessing; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; @@ -58,26 +58,26 @@ public class FaasController { */ public List> callFaasFunction(MicoCloudEventImpl cloudEvent) throws MicoCloudEventException { if (!this.openFaaSConfig.isSkipFunctionCall()) { - try { - URL functionUrl = openFaaSConfig.getFunctionUrl(); - log.debug("Start request to function '{}'", functionUrl.toString()); - String cloudEventSerialized = Json.encode(cloudEventManipulator.updateRouteHistoryWithFunctionCall(cloudEvent, openFaaSConfig.getFunctionName())); - log.debug("Serialized cloud event: {}", cloudEventSerialized); - String result = restTemplate.postForObject(functionUrl.toString(), cloudEventSerialized, String.class); - log.debug("Faas call resulted in: '{}'", result); - return parseFunctionResult(result, cloudEvent); - } catch (MalformedURLException e) { - throw new MicoCloudEventException("Failed to call faas-function. Caused by: " + e.getMessage(), cloudEvent); - } catch (IllegalStateException e) { - log.error("Failed to serialize CloudEvent '{}'.", cloudEvent); - throw new MicoCloudEventException("Failed to serialize CloudEvent while calling the faas-function.", cloudEvent); - } catch (HttpStatusCodeException e) { - log.error("A client error occurred with http status:{} . These exceptions are triggered if the FaaS function does not return 200 OK as the status code", e.getStatusCode(), e); - throw new MicoCloudEventException(e.toString(), cloudEvent); - } - } else { return Collections.singletonList(cloudEvent); } + try { + URL functionUrl = openFaaSConfig.getFunctionUrl(); + log.debug("Start request to function '{}'", functionUrl.toString()); + String cloudEventSerialized = Json.encode(cloudEventManipulator.updateRouteHistoryWithFunctionCall(cloudEvent, openFaaSConfig.getFunctionName())); + log.debug("Serialized cloud event: {}", cloudEventSerialized); + String result = restTemplate.postForObject(functionUrl.toString(), cloudEventSerialized, String.class); + log.debug("Faas call resulted in: '{}'", result); + return parseFunctionResult(result, cloudEvent); + } catch (MalformedURLException e) { + throw new MicoCloudEventException("Failed to call faas-function. Caused by: " + e.getMessage(), cloudEvent); + } catch (IllegalStateException e) { + log.error("Failed to serialize CloudEvent '{}'.", cloudEvent); + throw new MicoCloudEventException("Failed to serialize CloudEvent while calling the faas-function.", cloudEvent); + } catch (HttpStatusCodeException e) { + log.error("A client error occurred with http status:{} . These exceptions are triggered if the FaaS function does not return 200 OK as the status code", e.getStatusCode(), e); + throw new MicoCloudEventException(e.toString(), cloudEvent); + } + } /** diff --git a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/KafkaMessageSender.java b/src/main/java/io/github/ust/mico/kafkafaasconnector/messageprocessing/KafkaMessageSender.java similarity index 99% rename from src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/KafkaMessageSender.java rename to src/main/java/io/github/ust/mico/kafkafaasconnector/messageprocessing/KafkaMessageSender.java index fe92d60..50c4d33 100644 --- a/src/main/java/io/github/ust/mico/kafkafaasconnector/MessageProcessing/KafkaMessageSender.java +++ b/src/main/java/io/github/ust/mico/kafkafaasconnector/messageprocessing/KafkaMessageSender.java @@ -17,7 +17,7 @@ * under the License. */ -package io.github.ust.mico.kafkafaasconnector.MessageProcessing; +package io.github.ust.mico.kafkafaasconnector.messageprocessing; import com.fasterxml.jackson.databind.JsonNode; import io.github.ust.mico.kafkafaasconnector.configuration.KafkaConfig; diff --git a/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java b/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java index 803e279..ff0c9f3 100644 --- a/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java +++ b/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java @@ -21,11 +21,10 @@ import com.fasterxml.jackson.databind.JsonNode; import io.cloudevents.json.Json; -import io.github.ust.mico.kafkafaasconnector.MessageProcessing.CloudEventManipulator; -import io.github.ust.mico.kafkafaasconnector.MessageProcessing.FaasController; -import io.github.ust.mico.kafkafaasconnector.MessageProcessing.KafkaMessageSender; +import io.github.ust.mico.kafkafaasconnector.messageprocessing.CloudEventManipulator; +import io.github.ust.mico.kafkafaasconnector.messageprocessing.FaasController; +import io.github.ust.mico.kafkafaasconnector.messageprocessing.KafkaMessageSender; import io.github.ust.mico.kafkafaasconnector.configuration.KafkaConfig; -import io.github.ust.mico.kafkafaasconnector.configuration.OpenFaaSConfig; import io.github.ust.mico.kafkafaasconnector.exception.MicoCloudEventException; import io.github.ust.mico.kafkafaasconnector.kafka.CloudEventDeserializer; import io.github.ust.mico.kafkafaasconnector.kafka.MicoCloudEventImpl; From 94cd8e8740f0f6d66f1a8eb9e4d5714c4a7f2c82 Mon Sep 17 00:00:00 2001 From: wagnerdk <18245675+wagnerdk@users.noreply.github.com> Date: Sat, 27 Jul 2019 17:10:46 +0200 Subject: [PATCH 12/13] Split tests into logical groups --- .../CloudEventDeserializerTests.java | 37 ++++++ .../CloudEventManipulatorTests.java | 40 ++++++ .../FaasControllerTests.java | 69 +++++++++++ .../KafkaMessageSenderTests.java | 83 +++++++++++++ .../MessageListenerTests.java | 114 ------------------ 5 files changed, 229 insertions(+), 114 deletions(-) create mode 100644 src/test/java/io/github/ust/mico/kafkafaasconnector/CloudEventDeserializerTests.java create mode 100644 src/test/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulatorTests.java create mode 100644 src/test/java/io/github/ust/mico/kafkafaasconnector/FaasControllerTests.java create mode 100644 src/test/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSenderTests.java diff --git a/src/test/java/io/github/ust/mico/kafkafaasconnector/CloudEventDeserializerTests.java b/src/test/java/io/github/ust/mico/kafkafaasconnector/CloudEventDeserializerTests.java new file mode 100644 index 0000000..e29b77b --- /dev/null +++ b/src/test/java/io/github/ust/mico/kafkafaasconnector/CloudEventDeserializerTests.java @@ -0,0 +1,37 @@ +package io.github.ust.mico.kafkafaasconnector; + + +import io.github.ust.mico.kafkafaasconnector.kafka.CloudEventDeserializer; +import org.apache.kafka.common.errors.SerializationException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringRunner; + +import java.nio.charset.Charset; + +public class CloudEventDeserializerTests { + + /** + * Tests message deserialization with a broken message + */ + @Test(expected = SerializationException.class) + public void testBrokenMessageDeserialization(){ + CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer(); + String invalidMessage = "InvalidMessage"; + cloudEventDeserializer.deserialize("",invalidMessage.getBytes(Charset.defaultCharset())); + } + + /** + * Tests message serialization with a empty but not null message + */ + @Test(expected = SerializationException.class) + public void testEmptyMessageSerialization(){ + CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer(); + byte[] message = {}; + cloudEventDeserializer.deserialize("",message); + } +} diff --git a/src/test/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulatorTests.java b/src/test/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulatorTests.java new file mode 100644 index 0000000..93858b3 --- /dev/null +++ b/src/test/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulatorTests.java @@ -0,0 +1,40 @@ +package io.github.ust.mico.kafkafaasconnector; + +import com.fasterxml.jackson.databind.JsonNode; +import io.github.ust.mico.kafkafaasconnector.kafka.MicoCloudEventImpl; +import io.github.ust.mico.kafkafaasconnector.messageprocessing.CloudEventManipulator; +import org.junit.Test; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; + +public class CloudEventManipulatorTests { + /** + * Don't load the application context to speed up testing + */ + + + /** + * Tests if the createdFrom attribute is set correctly + */ + @Test + public void testCreatedFrom() { + MicoCloudEventImpl cloudEventSimple = CloudEventTestUtils.basicCloudEventWithRandomId(); + final String originalMessageId = "OriginalMessageId"; + CloudEventManipulator cloudEventManipulator = new CloudEventManipulator(); + cloudEventManipulator.setMissingHeaderFields(cloudEventSimple, originalMessageId); + assertThat("If the id changes the createdFrom attribute has to be set", cloudEventSimple.getCreatedFrom().orElse(null), is(originalMessageId)); + } + + /** + * Tests if the createdFrom attribute is omitted if it is not necessary + */ + @Test + public void testNotCreatedFrom() { + MicoCloudEventImpl cloudEventSimple = CloudEventTestUtils.basicCloudEventWithRandomId(); + CloudEventManipulator cloudEventManipulator = new CloudEventManipulator(); + cloudEventManipulator.setMissingHeaderFields(cloudEventSimple, cloudEventSimple.getId()); + assertThat("If the id stays the same the createdFrom attribute must be empty", cloudEventSimple.getCreatedFrom().orElse(null), is(nullValue())); + } +} diff --git a/src/test/java/io/github/ust/mico/kafkafaasconnector/FaasControllerTests.java b/src/test/java/io/github/ust/mico/kafkafaasconnector/FaasControllerTests.java new file mode 100644 index 0000000..108f431 --- /dev/null +++ b/src/test/java/io/github/ust/mico/kafkafaasconnector/FaasControllerTests.java @@ -0,0 +1,69 @@ +package io.github.ust.mico.kafkafaasconnector; + +import com.fasterxml.jackson.databind.JsonNode; +import io.cloudevents.json.Json; +import io.github.ust.mico.kafkafaasconnector.exception.MicoCloudEventException; +import io.github.ust.mico.kafkafaasconnector.kafka.MicoCloudEventImpl; +import io.github.ust.mico.kafkafaasconnector.messageprocessing.FaasController; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.rule.EmbeddedKafkaRule; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.ArrayList; + +import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; + +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK) +@EnableAutoConfiguration +@AutoConfigureMockMvc +@ActiveProfiles("testing") +public class FaasControllerTests { + + /** + * We need the embedded Kafka to successfully create the context. + */ + private final EmbeddedKafkaBroker embeddedKafka = broker.getEmbeddedKafka(); + + // https://docs.spring.io/spring-kafka/docs/2.2.6.RELEASE/reference/html/#kafka-testing-junit4-class-rule + @ClassRule + public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1, false); + + @Autowired + FaasController faasController; + + @Test + public void parseEmptyFunctionResult() throws MicoCloudEventException { + ArrayList> result = this.faasController.parseFunctionResult("[]", null); + assertNotNull(result); + assertEquals(0, result.size()); + } + + @Test + public void parseFunctionResult() throws MicoCloudEventException { + MicoCloudEventImpl cloudEvent1 = CloudEventTestUtils.basicCloudEvent("CloudEvent1"); + MicoCloudEventImpl cloudEvent2 = CloudEventTestUtils.basicCloudEvent("CloudEvent2"); + ArrayList> input = new ArrayList<>(); + input.add(cloudEvent1); + input.add(cloudEvent2); + String functionInput = Json.encode(input); + ArrayList> result = this.faasController.parseFunctionResult(functionInput, null); + assertNotNull(result); + assertEquals(2, result.size()); + assertEquals(result.get(0).getId(), cloudEvent1.getId()); + assertEquals(result.get(0).getSource(), cloudEvent1.getSource()); + assertEquals(result.get(0).getType(), cloudEvent1.getType()); + assertTrue(result.get(0).getTime().get().isEqual(cloudEvent1.getTime().get())); + assertEquals(result.get(1).getId(), cloudEvent2.getId()); + assertEquals(result.get(0).getRoutingSlip(), cloudEvent2.getRoutingSlip()); + } +} diff --git a/src/test/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSenderTests.java b/src/test/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSenderTests.java new file mode 100644 index 0000000..40fb7ce --- /dev/null +++ b/src/test/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSenderTests.java @@ -0,0 +1,83 @@ +package io.github.ust.mico.kafkafaasconnector; + +import com.fasterxml.jackson.databind.JsonNode; +import io.github.ust.mico.kafkafaasconnector.configuration.KafkaConfig; +import io.github.ust.mico.kafkafaasconnector.kafka.MicoCloudEventImpl; +import io.github.ust.mico.kafkafaasconnector.messageprocessing.KafkaMessageSender; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.rule.EmbeddedKafkaRule; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK) +@EnableAutoConfiguration +@AutoConfigureMockMvc +@ActiveProfiles("testing") +public class KafkaMessageSenderTests { + + @Autowired + KafkaMessageSender kafkaMessageSender; + + /** + * We need the embedded Kafka to successfully create the context. + */ + private final EmbeddedKafkaBroker embeddedKafka = broker.getEmbeddedKafka(); + + // https://docs.spring.io/spring-kafka/docs/2.2.6.RELEASE/reference/html/#kafka-testing-junit4-class-rule + @ClassRule + public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1, false); + + + /** + * Tests if the need to filter out a message with "isTestMessage = true" and a "FilterOutBeforeTopic = message destination" + * is correctly recognized. + */ + @Test + public void testFilterOutCheck() { + MicoCloudEventImpl cloudEventSimple = CloudEventTestUtils.basicCloudEventWithRandomId(); + String testFilterTopic = "TestFilterTopic"; + cloudEventSimple.setFilterOutBeforeTopic(testFilterTopic); + cloudEventSimple.setIsTestMessage(true); + + assertTrue("The message should be filtered out", kafkaMessageSender.isTestMessageCompleted(cloudEventSimple, testFilterTopic)); + } + + /** + * Only "FilterOutBeforeTopic = message destination" is not enough to filter a message out. It needs to be a test message. + */ + @Test + public void testNotFilterOutCheck() { + MicoCloudEventImpl cloudEventSimple = CloudEventTestUtils.basicCloudEventWithRandomId(); + String testFilterTopic = "TestFilterTopic"; + cloudEventSimple.setFilterOutBeforeTopic(testFilterTopic); + + assertFalse("The message not should be filtered out, because it is not a test message", kafkaMessageSender.isTestMessageCompleted(cloudEventSimple, testFilterTopic)); + } + + /** + * Tests if a test message with "FilterOutBeforeTopic != message destination" will wrongly be filtered out. + */ + @Test + public void testNotFilterOutCheckDifferentTopics() { + MicoCloudEventImpl cloudEventSimple = CloudEventTestUtils.basicCloudEventWithRandomId(); + String testFilterTopic = "TestFilterTopic"; + cloudEventSimple.setFilterOutBeforeTopic(testFilterTopic); + cloudEventSimple.setIsTestMessage(true); + + assertFalse("The message not should be filtered out, because it has not reached the filter out topic", kafkaMessageSender.isTestMessageCompleted(cloudEventSimple, testFilterTopic + "Difference")); + } +} diff --git a/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java b/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java index ff0c9f3..76defc1 100644 --- a/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java +++ b/src/test/java/io/github/ust/mico/kafkafaasconnector/MessageListenerTests.java @@ -83,15 +83,6 @@ public class MessageListenerTests { @Autowired MessageListener messageListener; - @Autowired - FaasController faasController; - - @Autowired - KafkaMessageSender kafkaMessageSender; - - @Autowired - CloudEventManipulator cloudEventManipulator; - private MicoKafkaTestHelper micoKafkaTestHelper; @PostConstruct @@ -107,32 +98,6 @@ public void before() { requiredTopics.forEach(topic -> embeddedKafka.addTopics(topic)); } - @Test - public void parseEmptyFunctionResult() throws MicoCloudEventException { - ArrayList> result = this.faasController.parseFunctionResult("[]", null); - assertNotNull(result); - assertEquals(0, result.size()); - } - - @Test - public void parseFunctionResult() throws MicoCloudEventException { - MicoCloudEventImpl cloudEvent1 = CloudEventTestUtils.basicCloudEvent("CloudEvent1"); - MicoCloudEventImpl cloudEvent2 = CloudEventTestUtils.basicCloudEvent("CloudEvent2"); - ArrayList> input = new ArrayList<>(); - input.add(cloudEvent1); - input.add(cloudEvent2); - String functionInput = Json.encode(input); - ArrayList> result = this.faasController.parseFunctionResult(functionInput, null); - assertNotNull(result); - assertEquals(2, result.size()); - assertEquals(result.get(0).getId(), cloudEvent1.getId()); - assertEquals(result.get(0).getSource(), cloudEvent1.getSource()); - assertEquals(result.get(0).getType(), cloudEvent1.getType()); - assertTrue(result.get(0).getTime().get().isEqual(cloudEvent1.getTime().get())); - assertEquals(result.get(1).getId(), cloudEvent2.getId()); - assertEquals(result.get(0).getRoutingSlip(), cloudEvent2.getRoutingSlip()); - } - /** * Test that expired cloud events actually are ignored. */ @@ -226,45 +191,6 @@ public void testRouteHistory() { MicoKafkaTestHelper.unsubscribeConsumer(consumer); } - /** - * Tests if the need to filter out a message with "isTestMessage = true" and a "FilterOutBeforeTopic = message destination" - * is correctly recognized. - */ - @Test - public void testFilterOutCheck() { - MicoCloudEventImpl cloudEventSimple = CloudEventTestUtils.basicCloudEventWithRandomId(); - String testFilterTopic = "TestFilterTopic"; - cloudEventSimple.setFilterOutBeforeTopic(testFilterTopic); - cloudEventSimple.setIsTestMessage(true); - - assertTrue("The message should be filtered out", kafkaMessageSender.isTestMessageCompleted(cloudEventSimple, testFilterTopic)); - } - - /** - * Only "FilterOutBeforeTopic = message destination" is not enough to filter a message out. It needs to be a test message. - */ - @Test - public void testNotFilterOutCheck() { - MicoCloudEventImpl cloudEventSimple = CloudEventTestUtils.basicCloudEventWithRandomId(); - String testFilterTopic = "TestFilterTopic"; - cloudEventSimple.setFilterOutBeforeTopic(testFilterTopic); - - assertFalse("The message not should be filtered out, because it is not a test message", kafkaMessageSender.isTestMessageCompleted(cloudEventSimple, testFilterTopic)); - } - - /** - * Tests if a test message with "FilterOutBeforeTopic != message destination" will wrongly be filtered out. - */ - @Test - public void testNotFilterOutCheckDifferentTopics() { - MicoCloudEventImpl cloudEventSimple = CloudEventTestUtils.basicCloudEventWithRandomId(); - String testFilterTopic = "TestFilterTopic"; - cloudEventSimple.setFilterOutBeforeTopic(testFilterTopic); - cloudEventSimple.setIsTestMessage(true); - - assertFalse("The message not should be filtered out, because it has not reached the filter out topic", kafkaMessageSender.isTestMessageCompleted(cloudEventSimple, testFilterTopic + "Difference")); - } - /** * Test if a not test message is filtered out */ @@ -384,46 +310,6 @@ public void testCorrelationIdUnChanged() { MicoKafkaTestHelper.unsubscribeConsumer(consumer); } - /** - * Tests if the createdFrom attribute is set correctly - */ - @Test - public void testCreatedFrom() { - MicoCloudEventImpl cloudEventSimple = CloudEventTestUtils.basicCloudEventWithRandomId(); - final String originalMessageId = "OriginalMessageId"; - cloudEventManipulator.setMissingHeaderFields(cloudEventSimple, originalMessageId); - assertThat("If the id changes the createdFrom attribute has to be set", cloudEventSimple.getCreatedFrom().orElse(null), is(originalMessageId)); - } - /** - * Tests if the createdFrom attribute is omitted if it is not necessary - */ - @Test - public void testNotCreatedFrom() { - MicoCloudEventImpl cloudEventSimple = CloudEventTestUtils.basicCloudEventWithRandomId(); - cloudEventManipulator.setMissingHeaderFields(cloudEventSimple, cloudEventSimple.getId()); - assertThat("If the id stays the same the createdFrom attribute must be empty", cloudEventSimple.getCreatedFrom().orElse(null), is(nullValue())); - } - - - /** - * Tests message deserialization with a broken message - */ - @Test(expected = SerializationException.class) - public void testBrokenMessageDeserialization(){ - CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer(); - String invalidMessage = "InvalidMessage"; - cloudEventDeserializer.deserialize("",invalidMessage.getBytes(Charset.defaultCharset())); - } - - /** - * Tests message serialization with a empty but not null message - */ - @Test(expected = SerializationException.class) - public void testEmptyMessageSerialization(){ - CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer(); - byte[] message = {}; - cloudEventDeserializer.deserialize("",message); - } } From 36055fad977a9f3b379f5d05e96360c7559308d7 Mon Sep 17 00:00:00 2001 From: wagnerdk <18245675+wagnerdk@users.noreply.github.com> Date: Sat, 27 Jul 2019 17:26:22 +0200 Subject: [PATCH 13/13] Add missing license --- .../CloudEventDeserializerTests.java | 24 ++++++++++++++----- .../CloudEventManipulatorTests.java | 18 ++++++++++++++ .../FaasControllerTests.java | 18 ++++++++++++++ .../KafkaMessageSenderTests.java | 22 +++++++++++++---- .../MicoKafkaTestHelper.java | 18 ++++++++++++++ 5 files changed, 90 insertions(+), 10 deletions(-) diff --git a/src/test/java/io/github/ust/mico/kafkafaasconnector/CloudEventDeserializerTests.java b/src/test/java/io/github/ust/mico/kafkafaasconnector/CloudEventDeserializerTests.java index e29b77b..91043f5 100644 --- a/src/test/java/io/github/ust/mico/kafkafaasconnector/CloudEventDeserializerTests.java +++ b/src/test/java/io/github/ust/mico/kafkafaasconnector/CloudEventDeserializerTests.java @@ -1,15 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package io.github.ust.mico.kafkafaasconnector; import io.github.ust.mico.kafkafaasconnector.kafka.CloudEventDeserializer; import org.apache.kafka.common.errors.SerializationException; import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.ActiveProfiles; -import org.springframework.test.context.junit4.SpringRunner; import java.nio.charset.Charset; diff --git a/src/test/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulatorTests.java b/src/test/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulatorTests.java index 93858b3..03c3f78 100644 --- a/src/test/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulatorTests.java +++ b/src/test/java/io/github/ust/mico/kafkafaasconnector/CloudEventManipulatorTests.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package io.github.ust.mico.kafkafaasconnector; import com.fasterxml.jackson.databind.JsonNode; diff --git a/src/test/java/io/github/ust/mico/kafkafaasconnector/FaasControllerTests.java b/src/test/java/io/github/ust/mico/kafkafaasconnector/FaasControllerTests.java index 108f431..657e2a7 100644 --- a/src/test/java/io/github/ust/mico/kafkafaasconnector/FaasControllerTests.java +++ b/src/test/java/io/github/ust/mico/kafkafaasconnector/FaasControllerTests.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package io.github.ust.mico.kafkafaasconnector; import com.fasterxml.jackson.databind.JsonNode; diff --git a/src/test/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSenderTests.java b/src/test/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSenderTests.java index 40fb7ce..5184f61 100644 --- a/src/test/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSenderTests.java +++ b/src/test/java/io/github/ust/mico/kafkafaasconnector/KafkaMessageSenderTests.java @@ -1,7 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package io.github.ust.mico.kafkafaasconnector; import com.fasterxml.jackson.databind.JsonNode; -import io.github.ust.mico.kafkafaasconnector.configuration.KafkaConfig; import io.github.ust.mico.kafkafaasconnector.kafka.MicoCloudEventImpl; import io.github.ust.mico.kafkafaasconnector.messageprocessing.KafkaMessageSender; import org.junit.ClassRule; @@ -11,11 +28,8 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.mock.mockito.MockBean; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; -import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner; diff --git a/src/test/java/io/github/ust/mico/kafkafaasconnector/MicoKafkaTestHelper.java b/src/test/java/io/github/ust/mico/kafkafaasconnector/MicoKafkaTestHelper.java index 7eb546d..208d1b3 100644 --- a/src/test/java/io/github/ust/mico/kafkafaasconnector/MicoKafkaTestHelper.java +++ b/src/test/java/io/github/ust/mico/kafkafaasconnector/MicoKafkaTestHelper.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package io.github.ust.mico.kafkafaasconnector; import com.fasterxml.jackson.databind.JsonNode;