Skip to content

Commit

Permalink
Merge pull request #17 from UST-MICO/fix/mico#762-refactor-kafka-faas…
Browse files Browse the repository at this point in the history
…-connector

Fix/mico#762 refactor kafka faas connector
  • Loading branch information
wagnerdk authored Jul 27, 2019
2 parents 11ed464 + d3ecd07 commit 6b467fc
Show file tree
Hide file tree
Showing 11 changed files with 778 additions and 346 deletions.
278 changes: 22 additions & 256 deletions src/main/java/io/github/ust/mico/kafkafaasconnector/MessageListener.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ public MicoCloudEventImpl<JsonNode> deserialize(String topic, byte[] data) {
log.debug("Trying to parse the message:" + message);
MicoCloudEventImpl<JsonNode> micoCloudEvent = Json.decodeValue(message, new TypeReference<MicoCloudEventImpl<JsonNode>>() {
});
log.debug("Deserialized micoCloudEvent '{}' on topic: '{}'",micoCloudEvent.toString(),topic);
log.debug("Deserialized micoCloudEvent '{}' on topic: '{}'", micoCloudEvent.toString(), topic);

if (!micoCloudEvent.getData().isPresent()) {
// data is entirely optional
log.debug("Received message does not include any data!");
}
return micoCloudEvent;
} catch (IllegalStateException e) {
throw new SerializationException("Could not create an CloudEvent message", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.github.ust.mico.kafkafaasconnector.messageprocessing;

import com.fasterxml.jackson.databind.JsonNode;
import io.github.ust.mico.kafkafaasconnector.configuration.KafkaConfig;
import io.github.ust.mico.kafkafaasconnector.kafka.MicoCloudEventImpl;
import io.github.ust.mico.kafkafaasconnector.kafka.RouteHistory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.net.URI;
import java.net.URISyntaxException;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;

@Slf4j
@Service
public class CloudEventManipulator {

private static final String ROUTE_HISTORY_TYPE_TOPIC = "topic";
private static final String ROUTE_HISTORY_TYPE_FAAS_FUNCTION = "faas-function";

@Autowired
private KafkaConfig kafkaConfig;

/**
* Add a topic routing step to the routing history of the cloud event.
*
* @param cloudEvent the cloud event to update
* @param topic the next topic the event will be sent to
* @return the updated cloud event
*/
public MicoCloudEventImpl<JsonNode> updateRouteHistoryWithTopic(MicoCloudEventImpl<JsonNode> cloudEvent, String topic) {
return this.updateRouteHistory(cloudEvent, topic, ROUTE_HISTORY_TYPE_TOPIC);
}

/**
* Update the routing history in the `route` header field of the cloud event.
*
* @param cloudEvent the cloud event to update
* @param id the string id of the next routing step the message will take
* @param type the type of the routing step ("topic" or "faas-function")
* @return the updated cloud event
*/
public MicoCloudEventImpl<JsonNode> updateRouteHistory(MicoCloudEventImpl<JsonNode> cloudEvent, String id, String type) {
RouteHistory routingStep = new RouteHistory(type, id, ZonedDateTime.now());
List<RouteHistory> history = cloudEvent.getRoute().map(ArrayList::new).orElse(new ArrayList<>());
history.add(routingStep);
return new MicoCloudEventImpl<>(cloudEvent).setRoute(history);
}

/**
* Add a function call routing step to the routing history of the cloud event.
*
* @param cloudEvent the cloud event to update
* @param functionId the id of the function applied to the cloud event next
* @return the updated cloud event
*/
public MicoCloudEventImpl<JsonNode> updateRouteHistoryWithFunctionCall(MicoCloudEventImpl<JsonNode> cloudEvent, String functionId) {
return this.updateRouteHistory(cloudEvent, functionId, ROUTE_HISTORY_TYPE_FAAS_FUNCTION);
}

/**
* Sets the time, the correlationId and the Id field of a CloudEvent message if missing
*
* @param cloudEvent the cloud event to send
* @param originalMessageId the id of the original message
*/
public void setMissingHeaderFields(MicoCloudEventImpl<JsonNode> cloudEvent, String originalMessageId) {

setMissingId(cloudEvent);
setMissingTime(cloudEvent);

if (!StringUtils.isEmpty(originalMessageId)) {
setMissingCorrelationId(cloudEvent, originalMessageId);
setMissingCreatedFrom(cloudEvent, originalMessageId);
}

// Add source if it is an error message, e.g.: kafka://mico/transform-request
if (cloudEvent.isErrorMessage().orElse(false)) {
setMissingSource(cloudEvent);
}
}

/**
* Sets the source field of an cloud event message to "kafka://{groupId}/{inputTopic}".
*
* @param cloudEvent
*/
private void setMissingSource(MicoCloudEventImpl<JsonNode> cloudEvent) {
try {
URI source = new URI("kafka://" + this.kafkaConfig.getGroupId() + "/" + this.kafkaConfig.getInputTopic());
cloudEvent.setSource(source);
} catch (URISyntaxException e) {
log.error("Could not construct a valid source attribute for the error message. " +
"Caused by: {}", e.getMessage());
}
}

/**
* Adds the createdFrom field to the message if the messageId is different from the originalMessageId and
* the createdFrom field is empty.
*
* @param cloudEvent
* @param originalMessageId
*/
private void setMissingCreatedFrom(MicoCloudEventImpl<JsonNode> cloudEvent, String originalMessageId) {
if (!cloudEvent.getId().equals(originalMessageId)) {
if (!cloudEvent.isErrorMessage().orElse(false) ||
(cloudEvent.isErrorMessage().orElse(false) && StringUtils.isEmpty(cloudEvent.getCreatedFrom().orElse("")))) {
cloudEvent.setCreatedFrom(originalMessageId);
}
}
}

/**
* Sets the message correlationId to the originalMessageId if the correlationId is missing
*
* @param cloudEvent
* @param originalMessageId
*/
private void setMissingCorrelationId(MicoCloudEventImpl<JsonNode> cloudEvent, String originalMessageId) {
if (!cloudEvent.getCorrelationId().isPresent()) {
cloudEvent.setCorrelationId(originalMessageId);
}
}

/**
* Adds the required field 'time' if it is missing.
*
* @param cloudEvent
*/
private void setMissingTime(MicoCloudEventImpl<JsonNode> cloudEvent) {
if (!cloudEvent.getTime().isPresent()) {
cloudEvent.setTime(ZonedDateTime.now());
log.debug("Added missing time '{}' to cloud event", cloudEvent.getTime().orElse(null));
}
}

/**
* Sets a missing message id to a randomly generated one.
*
* @param cloudEvent
*/
private void setMissingId(MicoCloudEventImpl<JsonNode> cloudEvent) {
if (StringUtils.isEmpty(cloudEvent.getId())) {
cloudEvent.setRandomId();
log.debug("Added missing id '{}' to cloud event", cloudEvent.getId());
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.github.ust.mico.kafkafaasconnector.messageprocessing;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import io.cloudevents.json.Json;
import io.github.ust.mico.kafkafaasconnector.configuration.OpenFaaSConfig;
import io.github.ust.mico.kafkafaasconnector.exception.MicoCloudEventException;
import io.github.ust.mico.kafkafaasconnector.kafka.MicoCloudEventImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.HttpStatusCodeException;
import org.springframework.web.client.RestTemplate;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@Slf4j
@Service
public class FaasController {

@Autowired
private CloudEventManipulator cloudEventManipulator;

@Autowired
private OpenFaaSConfig openFaaSConfig;

@Autowired
private RestTemplate restTemplate;

/**
* Synchronously call the configured openFaaS function.
*
* @param cloudEvent the cloud event used as parameter for the function
* @return the result of the function call (in serialized form)
*/
public List<MicoCloudEventImpl<JsonNode>> callFaasFunction(MicoCloudEventImpl<JsonNode> cloudEvent) throws MicoCloudEventException {
if (this.openFaaSConfig.isSkipFunctionCall()) {
return Collections.singletonList(cloudEvent);
}
URL functionUrl = null;
try {
functionUrl = openFaaSConfig.getFunctionUrl();
log.debug("Start request to function '{}'", functionUrl.toString());
String cloudEventSerialized = Json.encode(cloudEventManipulator.updateRouteHistoryWithFunctionCall(cloudEvent, openFaaSConfig.getFunctionName()));
log.debug("Serialized cloud event: {}", cloudEventSerialized);
String result = restTemplate.postForObject(functionUrl.toString(), cloudEventSerialized, String.class);
log.debug("Faas call resulted in: '{}'", result);
return parseFunctionResult(result, cloudEvent);
} catch (MalformedURLException e) {
throw new MicoCloudEventException("Failed to call faas-function. Caused by: " + e.getMessage(), cloudEvent);
} catch (IllegalStateException e) {
log.error("Failed to serialize CloudEvent '{}'.", cloudEvent);
throw new MicoCloudEventException("Failed to serialize CloudEvent while calling the faas-function.", cloudEvent);
} catch (HttpStatusCodeException e) {
log.error("FaaS function '{}' returned http status code '{}'. Expected 200 OK.", functionUrl, e.getStatusCode());
throw new MicoCloudEventException(e.toString(), cloudEvent);
}
}

/**
* Parse the result of a faas function call.
*
* @param sourceCloudEvent only used for better error messages
* @return an ArrayList of cloud events
*/
public ArrayList<MicoCloudEventImpl<JsonNode>> parseFunctionResult(String functionResult, MicoCloudEventImpl<JsonNode> sourceCloudEvent) throws MicoCloudEventException {
try {
return Json.decodeValue(functionResult, new TypeReference<ArrayList<MicoCloudEventImpl<JsonNode>>>() {
});
} catch (IllegalStateException e) {
log.error("Failed to parse JSON from response '{}'.", functionResult);
throw new MicoCloudEventException("Failed to parse JSON from response from the faas-function.", sourceCloudEvent);
}
}

}
Loading

0 comments on commit 6b467fc

Please sign in to comment.