Skip to content

Commit

Permalink
Merge pull request #395 from Cyril-Edme/ISS-330-send-headers-while-pr…
Browse files Browse the repository at this point in the history
…oducing-to-kafka

Iss 330 send headers while producing to kafka
  • Loading branch information
authorjapps authored May 16, 2020
2 parents 838ee34 + c19a5a4 commit 15bc03c
Show file tree
Hide file tree
Showing 17 changed files with 438 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,79 @@
package org.jsmart.zerocode.core.di.provider;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapter;
import com.google.gson.TypeAdapterFactory;
import com.google.gson.reflect.TypeToken;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonToken;
import com.google.gson.stream.JsonWriter;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;

import javax.inject.Provider;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class GsonSerDeProvider implements Provider<Gson> {

@Override
public Gson get() {
return new GsonBuilder()
.registerTypeAdapterFactory(KafkaHeadersAdapter.FACTORY)
.create();
}

static class KafkaHeadersAdapter extends TypeAdapter<Headers> {

static final TypeAdapterFactory FACTORY = new TypeAdapterFactory() {
@SuppressWarnings("unchecked")
@Override
public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
if (type.getRawType() == Headers.class) {
return (TypeAdapter<T>) new KafkaHeadersAdapter(gson);
}
return null;
}
};

private final Gson gson;

public KafkaHeadersAdapter(Gson gson) {
this.gson = gson;
}

@Override
public void write(JsonWriter writer, Headers value) throws IOException {
if (value == null || !value.iterator().hasNext()) {
writer.nullValue();
} else {
Map<String, String> headers = new HashMap<>();
value.forEach(header -> headers.put(header.key(), new String(header.value())));
gson.getAdapter(Map.class).write(writer, headers);
}
}

@Override
public Headers read(JsonReader reader) throws IOException {
Headers headers = null;
JsonToken peek = reader.peek();
if (JsonToken.NULL.equals(peek)) {
reader.nextNull();
} else {
Map<String, String> map = gson.getAdapter(Map.class).read(reader);

return (new Gson());
headers = new RecordHeaders();
for (Map.Entry<String, String> entry : map.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
headers.add(key, value == null ? null : value.getBytes());
}
}

return headers;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.jsmart.zerocode.core.di.provider.GsonSerDeProvider;
import org.jsmart.zerocode.core.di.provider.ObjectMapperProvider;
import org.jsmart.zerocode.core.kafka.consume.ConsumerLocalConfigs;
Expand Down Expand Up @@ -180,11 +185,19 @@ public static void readJson(List<ConsumerJsonRecord> jsonRecords,

Object key = thisRecord.key();
Object value = thisRecord.value();
LOGGER.info("\nRecord Key - {} , Record value - {}, Record partition - {}, Record offset - {}",
key, value, thisRecord.partition(), thisRecord.offset());
Headers headers = thisRecord.headers();
LOGGER.info("\nRecord Key - {} , Record value - {}, Record partition - {}, Record offset - {}, Headers - {}",
key, value, thisRecord.partition(), thisRecord.offset(), headers);

JsonNode valueNode = objectMapper.readTree(value.toString());
ConsumerJsonRecord jsonRecord = new ConsumerJsonRecord(thisRecord.key(), null, valueNode);
Map<String, String> headersMap = null;
if (headers != null) {
headersMap = new HashMap<>();
for (Header header : headers) {
headersMap.put(header.key(), new String(header.value()));
}
}
ConsumerJsonRecord jsonRecord = new ConsumerJsonRecord(thisRecord.key(), null, valueNode, headersMap);
jsonRecords.add(jsonRecord);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class KafkaProducerHelper {
private static final Gson gson = new GsonSerDeProvider().get();
private static final ObjectMapper objectMapper = new ObjectMapperProvider().get();

public static Producer<Long, String> createProducer(String bootStrapServers, String producerPropertyFile) {
public static Producer<Long, String> createProducer(String bootStrapServers, String producerPropertyFile) {
try (InputStream propsIs = Resources.getResource(producerPropertyFile).openStream()) {
Properties properties = new Properties();
properties.load(propsIs);
Expand Down Expand Up @@ -57,21 +57,20 @@ public static ProducerRecord prepareRecordToSend(String topicName, ProducerRecor
recordToSend.partition(),
recordToSend.timestamp(),
recordToSend.key(),
recordToSend.value());
recordToSend.value(),
recordToSend.headers());
}

public static ProducerRecord prepareJsonRecordToSend(String topicName, ProducerJsonRecord recordToSend) {

return new ProducerRecord(topicName,
//recordToSend.partition(),
//recordToSend.timestamp(),
public static ProducerRecord<Object, Object> prepareJsonRecordToSend(String topicName, ProducerJsonRecord<?> recordToSend) {
return ProducerRecordBuilder.from(topicName,
recordToSend.getKey(),
// --------------------------------------------
// It's a JSON as String. Nothing to worry !
// Kafka StringSerializer needs in this format.
// --------------------------------------------
recordToSend.getValue().toString()
);
recordToSend.getValue().toString())
.withHeaders(recordToSend.getHeaders())
.build();
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.jsmart.zerocode.core.kafka.helper;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;

import java.util.Map;

public class ProducerRecordBuilder {
private String topic;
private Object key;
private Object value;
private Headers headers;

private ProducerRecordBuilder() {}

public static ProducerRecordBuilder from(String topic, Object key, Object value) {
ProducerRecordBuilder producerRecordBuilder = new ProducerRecordBuilder();

producerRecordBuilder.topic = topic;
producerRecordBuilder.key = key;
producerRecordBuilder.value = value;

return producerRecordBuilder;
}

public ProducerRecordBuilder withHeaders(Map<String, String> headers) {
if (headers != null) {
this.headers = new RecordHeaders();
headers.forEach((hKey, hValue) -> this.headers.add(hKey, hValue.getBytes()));
}

return this;
}

public ProducerRecord<Object, Object> build() {
return new ProducerRecord<>(topic, null, null, key, value, headers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;

import java.util.Map;

public class ConsumerJsonRecord<K> {
private final K key;
private final JsonNode jsonKey;
private final JsonNode value;
private final Map<String, String> headers;

@JsonCreator
public ConsumerJsonRecord(
@JsonProperty("key") K key,
@JsonProperty("jsonKey") JsonNode jsonKey,
@JsonProperty("value") JsonNode value) {
@JsonProperty("value") JsonNode value,
@JsonProperty("headers") Map<String, String> headers) {
this.key = key;
this.jsonKey = jsonKey;
this.value = value;
this.headers = headers;
}

public K getKey() {
Expand All @@ -31,6 +36,10 @@ public JsonNode getValue() {
return value;
}

public Map<String, String> getHeaders() {
return headers;
}

@Override
public String toString() {
return "Record{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public String send(String brokers, String topicName, String requestJson) throws
switch (recordType) {
case RAW:
rawRecords = gson.fromJson(requestJson, ProducerRawRecords.class);

String fileName = rawRecords.getFile();
if (fileName != null) {
File file = validateAndGetFile(fileName);
Expand All @@ -87,6 +88,7 @@ public String send(String brokers, String topicName, String requestJson) throws

case JSON:
jsonRecords = objectMapper.readValue(requestJson, ProducerJsonRecords.class);

fileName = jsonRecords.getFile();
if (fileName != null) {
File file = validateAndGetFile(fileName);
Expand All @@ -112,7 +114,7 @@ public String send(String brokers, String topicName, String requestJson) throws
}

} catch (Exception e) {
LOGGER.error("Error in sending record. Exception : " + e );
LOGGER.error("Error in sending record.", e);
String failedStatus = objectMapper.writeValueAsString(new DeliveryDetails(FAILED, e.getMessage()));
return prettyPrintJson(failedStatus);

Expand All @@ -131,7 +133,7 @@ private String sendRaw(String topicName,
ProducerRecord qualifiedRecord = prepareRecordToSend(topicName, recordToSend);

RecordMetadata metadata;
if (isAsync != null && isAsync == true) {
if (Boolean.TRUE.equals(isAsync)) {
LOGGER.info("Asynchronous Producer sending record - {}", qualifiedRecord);
metadata = (RecordMetadata) producer.send(qualifiedRecord, new ProducerAsyncCallback()).get();
} else {
Expand All @@ -157,7 +159,7 @@ private String sendJson(String topicName,
ProducerRecord record = prepareJsonRecordToSend(topicName, recordToSend);

RecordMetadata metadata;
if (isAsync != null && isAsync == true) {
if (Boolean.TRUE.equals(isAsync)) {
LOGGER.info("Asynchronous - Producer sending JSON record - {}", record);
metadata = (RecordMetadata) producer.send(record, new ProducerAsyncCallback()).get();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;

import java.util.Map;

// TODO - add timestamp, partition key etc
public class ProducerJsonRecord<K> {
private final K key;
private final JsonNode jsonKey;
private final JsonNode value;
private final Map<String, String> headers;

@JsonCreator
public ProducerJsonRecord(
@JsonProperty("key") K key,
@JsonProperty("jsonKey") JsonNode jsonKey,
@JsonProperty("value") JsonNode value) {
@JsonProperty("value") JsonNode value,
@JsonProperty("headers") Map<String, String> headers) {
this.key = key;
this.jsonKey = jsonKey;
this.value = value;
this.headers = headers;
}

public K getKey() {
Expand All @@ -32,12 +37,17 @@ public JsonNode getValue() {
return value;
}

public Map<String, String> getHeaders() {
return headers;
}

@Override
public String toString() {
return "Record{" +
"key='" + key + '\'' +
", jsonKey=" + jsonKey +
", value=" + value +
", headers=" + headers +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
package org.jsmart.zerocode.core.kafka.helper;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Iterators;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.hamcrest.CoreMatchers;
import org.jsmart.zerocode.core.kafka.consume.ConsumerLocalConfigs;
import org.jsmart.zerocode.core.kafka.consume.ConsumerLocalConfigsWrap;
import org.jsmart.zerocode.core.kafka.receive.ConsumerCommonConfigs;
import org.jsmart.zerocode.core.kafka.receive.message.ConsumerJsonRecord;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mock;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
Expand Down Expand Up @@ -118,4 +132,25 @@ public void test_effectiveCommitAsyncFromCommon_true() throws Exception{
assertThat(consumerEffectiveConfigs.getCommitAsync(), is(true));
assertThat(consumerEffectiveConfigs.getCommitSync(), is(false));
}

@Test
public void should_read_json_with_headers_in_record() throws IOException {
// given
ConsumerRecord consumerRecord = Mockito.mock(ConsumerRecord.class);
Mockito.when(consumerRecord.key()).thenReturn("key");
Mockito.when(consumerRecord.value()).thenReturn("\"value\"");
Mockito.when(consumerRecord.headers())
.thenReturn(new RecordHeaders().add("headerKey", "headerValue".getBytes()));

// when
List<ConsumerJsonRecord> consumerJsonRecords = new ArrayList<>();
KafkaConsumerHelper.readJson(consumerJsonRecords, Iterators.forArray(consumerRecord));

// then
Assert.assertEquals(1, consumerJsonRecords.size());
ConsumerJsonRecord consumerJsonRecord = consumerJsonRecords.get(0);
Assert.assertEquals("key", consumerJsonRecord.getKey());
Assert.assertEquals("\"value\"", consumerJsonRecord.getValue().toString());
Assert.assertEquals(Collections.singletonMap("headerKey", "headerValue"), consumerJsonRecord.getHeaders());
}
}
Loading

0 comments on commit 15bc03c

Please sign in to comment.