Skip to content

Commit

Permalink
[SDCISA-16147, swisspost#583] Quick-n-dirty just waste anothers threa…
Browse files Browse the repository at this point in the history
…ds time for JSON serialization.

Update:
- Fix hard-to-trace vertxPromise NPE.

Related: SDCISA-15633, SDCISA-15833, SDCISA-16147, swisspost/vertx-redisques#170, swisspost/vertx-redisques#177, swisspost/vertx-rest-storage#186, swisspost#577, swisspost/vertx-redisques#181, swisspost#493, swisspost/vertx-rest-storage#188, swisspost#583
  • Loading branch information
hiddenalpha committed Jun 4, 2024
1 parent acbcdac commit 0037164
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
Expand All @@ -12,6 +13,7 @@
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceConsumer;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager;
import org.swisspush.gateleen.core.exception.GateleenExceptionFactory;
import org.swisspush.gateleen.core.http.RequestLoggerFactory;
import org.swisspush.gateleen.core.util.ResponseStatusCodeLogUtil;
import org.swisspush.gateleen.core.util.StatusCode;
Expand All @@ -25,6 +27,8 @@
import java.util.Optional;
import java.util.regex.Pattern;

import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory;

/**
* Handler class for all Kafka related requests.
*
Expand All @@ -42,45 +46,77 @@ public class KafkaHandler extends ConfigurationResourceConsumer {
private final Logger log = LoggerFactory.getLogger(KafkaHandler.class);

private final String streamingPath;
private final GateleenExceptionFactory exceptionFactory;
private final KafkaProducerRepository repository;
private final KafkaTopicExtractor topicExtractor;
private final KafkaMessageSender kafkaMessageSender;
private final Map<String, Object> properties;
private final KafkaProducerRecordBuilder kafkaProducerRecordBuilder;
private KafkaMessageValidator kafkaMessageValidator;

private boolean initialized = false;

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) {
this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath);
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator,
KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri,
String streamingPath) {
this(configurationResourceManager, kafkaMessageValidator, repository, kafkaMessageSender,
configResourceUri, streamingPath, new HashMap<>());
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {

this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath, properties);
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {
this(Vertx.vertx(), newGateleenThriftyExceptionFactory(), configurationResourceManager,
kafkaMessageValidator, repository, kafkaMessageSender, configResourceUri, streamingPath,
properties);
log.warn("TODO: Do NOT use this DEPRECATED constructor! It creates instances that it should not create!");
}

public KafkaHandler(
Vertx vertx,
GateleenExceptionFactory exceptionFactory,
ConfigurationResourceManager configurationResourceManager,
KafkaMessageValidator kafkaMessageValidator,
KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender,
String configResourceUri,
String streamingPath,
Map<String, Object> properties
) {
super(configurationResourceManager, configResourceUri, "gateleen_kafka_topic_configuration_schema");
this.exceptionFactory = exceptionFactory;
this.repository = repository;
this.kafkaMessageValidator = kafkaMessageValidator;
this.kafkaMessageSender = kafkaMessageSender;
this.streamingPath = streamingPath;
this.properties = properties;

this.topicExtractor = new KafkaTopicExtractor(streamingPath);
this.kafkaProducerRecordBuilder = new KafkaProducerRecordBuilder(vertx, exceptionFactory);
}

public static KafkaHandlerBuilder builder() {
return new KafkaHandlerBuilder();
}

public Future<Void> initialize() {
Expand Down Expand Up @@ -140,9 +176,11 @@ public boolean handle(final HttpServerRequest request) {
}

request.bodyHandler(payload -> {
try {
log.debug("incoming kafka message payload: {}", payload);
final List<KafkaProducerRecord<String, String>> kafkaProducerRecords = KafkaProducerRecordBuilder.buildRecords(topic, payload);
log.debug("incoming kafka message payload: {}", payload);
// TODO refactor away this callback-hell (Counts for the COMPLETE method
// surrounding this line, named 'KafkaHandler.handle()', NOT only
// those lines below).
kafkaProducerRecordBuilder.buildRecordsAsync(topic, payload).compose((List<KafkaProducerRecord<String, String>> kafkaProducerRecords) -> {
maybeValidate(request, kafkaProducerRecords).onComplete(validationEvent -> {
if(validationEvent.succeeded()) {
if(validationEvent.result().isSuccess()) {
Expand All @@ -162,9 +200,15 @@ public boolean handle(final HttpServerRequest request) {
respondWith(StatusCode.INTERNAL_SERVER_ERROR, validationEvent.cause().getMessage(), request);
}
});
} catch (ValidationException ve){
respondWith(StatusCode.BAD_REQUEST, ve.getMessage(), request);
}
return Future.succeededFuture();
}).onFailure((Throwable ex) -> {
if (ex instanceof ValidationException) {
respondWith(StatusCode.BAD_REQUEST, ex.getMessage(), request);
return;
}
log.error("TODO error handling", exceptionFactory.newException(ex));
respondWith(StatusCode.INTERNAL_SERVER_ERROR, ex.getMessage(), request);
});
});
return true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.swisspush.gateleen.kafka;

import io.vertx.core.Vertx;
import org.slf4j.Logger;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager;
import org.swisspush.gateleen.core.exception.GateleenExceptionFactory;

import java.util.Map;

import static org.slf4j.LoggerFactory.getLogger;
import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory;

public class KafkaHandlerBuilder {

private static final Logger log = getLogger(KafkaHandlerBuilder.class);
private Vertx vertx;
private GateleenExceptionFactory exceptionFactory;
private ConfigurationResourceManager configurationResourceManager;
private KafkaMessageValidator kafkaMessageValidator;
private KafkaProducerRepository repository;
private KafkaMessageSender kafkaMessageSender;
private String configResourceUri;
private String streamingPath;
private Map<String, Object> properties;

/** Use {@link KafkaHandler#builder()} */
KafkaHandlerBuilder() {/**/}

public KafkaHandler build() {
if (vertx == null) throw new NullPointerException("vertx missing");
if (exceptionFactory == null) exceptionFactory = newGateleenThriftyExceptionFactory();
if (repository == null) throw new NullPointerException("kafkaProducerRepository missing");
if (kafkaMessageSender == null) throw new NullPointerException("kafkaMessageSender missing");
if (streamingPath == null) log.warn("no 'streamingPath' given. Are you sure you want none?");
return new KafkaHandler(
vertx, exceptionFactory, configurationResourceManager, kafkaMessageValidator, repository,
kafkaMessageSender, configResourceUri, streamingPath, properties);
}

public KafkaHandlerBuilder withVertx(Vertx vertx) {
this.vertx = vertx;
return this;
}

public KafkaHandlerBuilder withExceptionFactory(GateleenExceptionFactory exceptionFactory) {
this.exceptionFactory = exceptionFactory;
return this;
}

public KafkaHandlerBuilder withConfigurationResourceManager(ConfigurationResourceManager configurationResourceManager) {
this.configurationResourceManager = configurationResourceManager;
return this;
}

public KafkaHandlerBuilder withKafkaMessageValidator(KafkaMessageValidator kafkaMessageValidator) {
this.kafkaMessageValidator = kafkaMessageValidator;
return this;
}

public KafkaHandlerBuilder withRepository(KafkaProducerRepository repository) {
this.repository = repository;
return this;
}

public KafkaHandlerBuilder withKafkaMessageSender(KafkaMessageSender kafkaMessageSender) {
this.kafkaMessageSender = kafkaMessageSender;
return this;
}

public KafkaHandlerBuilder withConfigResourceUri(String configResourceUri) {
this.configResourceUri = configResourceUri;
return this;
}

public KafkaHandlerBuilder withStreamingPath(String streamingPath) {
this.streamingPath = streamingPath;
return this;
}

public KafkaHandlerBuilder withProperties(Map<String, Object> properties) {
this.properties = properties;
return this;
}

}
Original file line number Diff line number Diff line change
@@ -1,26 +1,45 @@
package org.swisspush.gateleen.kafka;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import org.slf4j.Logger;
import org.swisspush.gateleen.core.exception.GateleenExceptionFactory;
import org.swisspush.gateleen.validation.ValidationException;

import java.util.ArrayList;
import java.util.List;

import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
import static org.slf4j.LoggerFactory.getLogger;

/**
* Creates {@link KafkaProducerRecord}s by parsing the request payload.
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
class KafkaProducerRecordBuilder {

private static final Logger log = getLogger(KafkaProducerRecordBuilder.class);
private static final String RECORDS = "records";
private static final String KEY = "key";
private static final String VALUE = "value";
private static final String HEADERS = "headers";
private final Vertx vertx;
private final GateleenExceptionFactory exceptionFactory;

KafkaProducerRecordBuilder(
Vertx vertx,
GateleenExceptionFactory exceptionFactory
) {
this.vertx = vertx;
this.exceptionFactory = exceptionFactory;
}

/**
* Builds a list of {@link KafkaProducerRecord}s based on the provided payload.
Expand All @@ -32,6 +51,39 @@ class KafkaProducerRecordBuilder {
* @return A list of {@link KafkaProducerRecord}s created from the provided payload
* @throws ValidationException when the payload is not valid (missing properties, wrong types, etc.)
*/
Future<List<KafkaProducerRecord<String, String>>> buildRecordsAsync(String topic, Buffer payload) {
return Future.<Void>succeededFuture().compose((Void v) -> {
JsonObject payloadObj;
try {
payloadObj = new JsonObject(payload);
} catch (DecodeException de) {
return Future.failedFuture(new ValidationException("Error while parsing payload", de));
}
JsonArray recordsArray;
try {
recordsArray = payloadObj.getJsonArray(RECORDS);
} catch (ClassCastException cce) {
return Future.failedFuture(new ValidationException("Property '" + RECORDS + "' must be of type JsonArray holding JsonObject objects"));
}
if (recordsArray == null) {
return Future.failedFuture(new ValidationException("Missing 'records' array"));
}
return vertx.executeBlocking(() -> {
assert !currentThread().getName().toUpperCase().contains("EVENTLOOP") : currentThread().getName();
long beginEpchMs = currentTimeMillis();
List<KafkaProducerRecord<String, String>> kafkaProducerRecords = new ArrayList<>(recordsArray.size());
for (int i = 0; i < recordsArray.size(); i++) {
kafkaProducerRecords.add(fromRecordJsonObject(topic, recordsArray.getJsonObject(i)));
}
long durationMs = currentTimeMillis() - beginEpchMs;
log.debug("Serializing JSON did block thread for {}ms", durationMs);
return kafkaProducerRecords;
});
});
}

/** @deprecated Use {@link #buildRecordsAsync(String, Buffer)}. */
@Deprecated
static List<KafkaProducerRecord<String, String>> buildRecords(String topic, Buffer payload) throws ValidationException {
List<KafkaProducerRecord<String, String>> kafkaProducerRecords = new ArrayList<>();
JsonObject payloadObj;
Expand Down

0 comments on commit 0037164

Please sign in to comment.