Skip to content

Commit

Permalink
Merge pull request #455 from RDBreed/451-send-dynamic-headers-to-kafk…
Browse files Browse the repository at this point in the history
…a-in-records

#451 send dynamic headers to kafka in records
  • Loading branch information
authorjapps authored Oct 28, 2020
2 parents 8e238d1 + 6c05e10 commit 1dcf641
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.jsmart.zerocode.core.engine.executor;

import org.jsmart.zerocode.core.engine.preprocessor.ScenarioExecutionState;

public interface ApiServiceExecutor {
/**
*
Expand All @@ -25,8 +27,9 @@ public interface ApiServiceExecutor {
* @param kafkaTopic Kafka topic(s) residing on the brokers
* @param methodName A produce or consume or poll operation
* @param requestJson RAW or JSON records for producing, config settings for consuming
* @param scenarioExecutionState The state of the scenario execution
* @return String The broker acknowledgement in JSON
*/
String executeKafkaService(String kafkaServers, String kafkaTopic, String methodName, String requestJson);
String executeKafkaService(String kafkaServers, String kafkaTopic, String methodName, String requestJson, ScenarioExecutionState scenarioExecutionState);

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.inject.name.Named;
import org.jsmart.zerocode.core.engine.executor.httpapi.HttpApiExecutor;
import org.jsmart.zerocode.core.engine.executor.javaapi.JavaMethodExecutor;
import org.jsmart.zerocode.core.engine.preprocessor.ScenarioExecutionState;
import org.jsmart.zerocode.core.kafka.client.BasicKafkaClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -54,7 +55,7 @@ public String executeJavaOperation(String className, String methodName, String r
}

@Override
public String executeKafkaService(String kafkaServers, String kafkaTopic, String operation, String requestJson) {
return kafkaClient.execute(kafkaServers, kafkaTopic, operation, requestJson);
public String executeKafkaService(String kafkaServers, String kafkaTopic, String operation, String requestJson, ScenarioExecutionState scenarioExecutionState) {
return kafkaClient.execute(kafkaServers, kafkaTopic, operation, requestJson, scenarioExecutionState);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.jsmart.zerocode.core.kafka.client;

import com.google.inject.Inject;
import org.jsmart.zerocode.core.engine.preprocessor.ScenarioExecutionState;
import org.jsmart.zerocode.core.kafka.receive.KafkaReceiver;
import org.jsmart.zerocode.core.kafka.send.KafkaSender;
import org.slf4j.Logger;
Expand All @@ -19,7 +20,7 @@ public class BasicKafkaClient {
public BasicKafkaClient() {
}

public String execute(String brokers, String topicName, String operation, String requestJson) {
public String execute(String brokers, String topicName, String operation, String requestJson, ScenarioExecutionState scenarioExecutionState) {
LOGGER.info("brokers:{}, topicName:{}, operation:{}, requestJson:{}", brokers, topicName, operation, requestJson);

try {
Expand All @@ -28,7 +29,7 @@ public String execute(String brokers, String topicName, String operation, String
case "load":
case "publish":
case "produce":
return sender.send(brokers, topicName, requestJson);
return sender.send(brokers, topicName, requestJson, scenarioExecutionState);

case "unload":
case "consume":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,33 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.net.URL;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.jsmart.zerocode.core.di.provider.GsonSerDeProvider;
import org.jsmart.zerocode.core.di.provider.ObjectMapperProvider;
import org.jsmart.zerocode.core.engine.preprocessor.ScenarioExecutionState;
import org.jsmart.zerocode.core.engine.preprocessor.ZeroCodeAssertionsProcessorImpl;
import org.jsmart.zerocode.core.kafka.delivery.DeliveryDetails;
import org.jsmart.zerocode.core.kafka.send.message.ProducerJsonRecord;
import org.jsmart.zerocode.core.kafka.send.message.ProducerJsonRecords;
import org.jsmart.zerocode.core.kafka.send.message.ProducerRawRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.net.URL;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.jsmart.zerocode.core.constants.ZerocodeConstants.FAILED;
import static org.jsmart.zerocode.core.constants.ZerocodeConstants.OK;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.JSON;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.RAW;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.PROTO;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.RAW;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.RECORD_TYPE_JSON_PATH;
import static org.jsmart.zerocode.core.kafka.helper.KafkaProducerHelper.createProducer;
import static org.jsmart.zerocode.core.kafka.helper.KafkaProducerHelper.prepareJsonRecordToSend;
Expand All @@ -46,10 +49,13 @@ public class KafkaSender {
@Named("kafka.producer.properties")
private String producerPropertyFile;

@Inject
private ZeroCodeAssertionsProcessorImpl zeroCodeAssertionsProcessor;

private final ObjectMapper objectMapper = new ObjectMapperProvider().get();
private final Gson gson = new GsonSerDeProvider().get();

public String send(String brokers, String topicName, String requestJson) throws JsonProcessingException {
public String send(String brokers, String topicName, String requestJson, ScenarioExecutionState scenarioExecutionState) throws JsonProcessingException {
Producer<?, ?> producer = createProducer(brokers, producerPropertyFile);
String deliveryDetails = null;

Expand All @@ -73,7 +79,7 @@ public String send(String brokers, String topicName, String requestJson) throws
LOGGER.info("From file:'{}', Sending record number: {}\n", fileName, i);
deliveryDetails = sendRaw(topicName, producer, record, rawRecords.getAsync());
}
} catch(Throwable ex) {
} catch (Throwable ex) {
throw new RuntimeException(ex);
}
} else {
Expand All @@ -86,7 +92,7 @@ public String send(String brokers, String topicName, String requestJson) throws
}

break;
case PROTO:
case PROTO:
case JSON:
jsonRecords = objectMapper.readValue(requestJson, ProducerJsonRecords.class);

Expand All @@ -96,16 +102,18 @@ public String send(String brokers, String topicName, String requestJson) throws
try (BufferedReader br = new BufferedReader(new FileReader(file))) {
String line;
for (int i = 0; (line = br.readLine()) != null; i++) {
line = zeroCodeAssertionsProcessor.resolveStringJson(line,
scenarioExecutionState.getResolvedScenarioState());
ProducerJsonRecord record = objectMapper.readValue(line, ProducerJsonRecord.class);
LOGGER.info("From file:'{}', Sending record number: {}\n", fileName, i);
deliveryDetails = sendJson(topicName, producer, record, jsonRecords.getAsync(),recordType,requestJson);
deliveryDetails = sendJson(topicName, producer, record, jsonRecords.getAsync(), recordType, requestJson);
}
}
} else {
List<ProducerJsonRecord> records = jsonRecords.getRecords();
validateProduceRecord(records);
for (int i = 0; i < records.size(); i++) {
deliveryDetails = sendJson(topicName, producer, records.get(i), jsonRecords.getAsync(),recordType,requestJson);
deliveryDetails = sendJson(topicName, producer, records.get(i), jsonRecords.getAsync(), recordType, requestJson);
}
}

Expand Down Expand Up @@ -159,7 +167,7 @@ private String sendJson(String topicName,
Boolean isAsync,
String recordType,
String requestJson) throws InterruptedException, ExecutionException {
ProducerRecord record = prepareJsonRecordToSend(topicName, recordToSend,recordType, requestJson);
ProducerRecord record = prepareJsonRecordToSend(topicName, recordToSend, recordType, requestJson);

RecordMetadata metadata;
if (Boolean.TRUE.equals(isAsync)) {
Expand All @@ -182,13 +190,12 @@ private String sendJson(String topicName,
return deliveryDetails;
}



private File validateAndGetFile(String fileName) {
try{
private File validateAndGetFile(String fileName) {
try {
URL resource = getClass().getClassLoader().getResource(fileName);
return new File(resource.getFile());
} catch(Exception ex) {
} catch (Exception ex) {
throw new RuntimeException("Error accessing file: `" + fileName + "' - " + ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ private String executeApi(String logPrefixRelationshipId,
.request(prettyPrintJson(resolvedRequestJson));

String topicName = url.substring(KAFKA_TOPIC.length());
executionResult = apiExecutor.executeKafkaService(kafkaServers, topicName, operationName, resolvedRequestJson);
executionResult = apiExecutor.executeKafkaService(kafkaServers, topicName, operationName, resolvedRequestJson, scenarioExecutionState);
break;

case NONE:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.jsmart.zerocode.kafka;

import org.jsmart.zerocode.core.engine.preprocessor.ScenarioExecutionState;
import org.jsmart.zerocode.core.kafka.client.BasicKafkaClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -17,7 +18,7 @@ public MyCustomKafkaClient() {
}

@Override
public String execute(String brokers, String topicName, String operation, String requestJson) {
public String execute(String brokers, String topicName, String operation, String requestJson, ScenarioExecutionState scenarioExecutionState) {
customCodeExecuted = true;
// ---
// Use your custom send and receive mechanism here
Expand All @@ -30,7 +31,7 @@ public String execute(String brokers, String topicName, String operation, String
// Just a sanity check if flow has hit this point or not.
assertThat(customCodeExecuted, is(true));

return super.execute(brokers, topicName, operation, requestJson);
return super.execute(brokers, topicName, operation, requestJson, scenarioExecutionState);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ public class KafkaProduceSyncFromFileJsonTest {
public void testProduceAnd_syncFromFileJson() throws Exception {
}

@Test
@Scenario("kafka/produce/file_produce/test_kafka_produce_sync_from_file_json_with_ref.json")
public void testProduceAnd_syncFromFileWithVarsJson() throws Exception {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"key":"1546955346669","value":{"id":121,"name":"${$.load_kafka.response.recordMetadata.topicPartition.topic}-My-Value-1"}}
{"key":"1546955346670","value":{"id":122,"name":"${$.load_kafka.response.recordMetadata.topicPartition.topic}-My-Value-2"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{
"scenarioName": "Produce a message - Sync - From File",
"steps": [
{
"name": "load_kafka",
"url": "kafka-topic:demo-file-3",
"operation": "produce",
"request": {
"async": false,
"recordType" : "JSON",
"file": "kafka/pfiles/test_data_json.json"
},
"assertions": {
"status" : "Ok",
"recordMetadata" : {
"topicPartition" : {
"topic" : "demo-file-3"
}
}
}
},
{
"name": "load_kafka_with_ref",
"url": "kafka-topic:demo-file-3",
"operation": "produce",
"request": {
"async": false,
"recordType" : "JSON",
"file": "kafka/pfiles/test_data_json_with_vars.json"
},
"assertions": {
"status" : "Ok",
"recordMetadata" : {
"topicPartition" : {
"topic" : "demo-file-3"
}
}
}
},
{
"name": "consume_raw",
"url": "kafka-topic:demo-file-3",
"operation": "unload",
"request": {
"consumerLocalConfigs": {
"recordType" : "JSON",
"commitSync": true,
"showRecordsConsumed": true,
"maxNoOfRetryPollsOrTimeouts": 3
}
},
"assertions": {
"size": 4
}
}

]
}

0 comments on commit 1dcf641

Please sign in to comment.