Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka sink #3127

Merged
merged 28 commits into from
Aug 26, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e252f19
-Support for kafka-sink
rajeshLovesToCode Jul 26, 2023
288dded
Merge branch 'opensearch-project:main' into kafka-sink
rajeshLovesToCode Jul 27, 2023
4509740
Merge branch 'opensearch-project:main' into kafka-sink
rajeshLovesToCode Jul 28, 2023
5e33025
-Support for kafka-sink
rajeshLovesToCode Jul 28, 2023
3370ba2
Merge branch 'opensearch-project:main' into kafka-sink
rajeshLovesToCode Jul 30, 2023
3b59f68
Merge branch 'opensearch-project:main' into kafka-sink
rajeshLovesToCode Aug 1, 2023
996d99f
Merge branch 'opensearch-project:main' into kafka-sink
rajeshLovesToCode Aug 2, 2023
042fcca
Merge branch 'opensearch-project:main' into kafka-sink
rajeshLovesToCode Aug 3, 2023
ce747f6
Merge branch 'opensearch-project:main' into kafka-sink
rajeshLovesToCode Aug 4, 2023
b102fe7
Merge branch 'opensearch-project:main' into kafka-sink
rajeshLovesToCode Aug 7, 2023
3327799
Merge branch 'opensearch-project:main' into kafka-sink
rajeshLovesToCode Aug 8, 2023
18506b0
Merge branch 'opensearch-project:main' into kafka-sink
rajeshLovesToCode Aug 9, 2023
2708c67
-Support for kafka-sink
rajeshLovesToCode Aug 9, 2023
6daf21d
Merge branch 'opensearch-project:main' into kafka-sink
rajeshLovesToCode Aug 10, 2023
a92caa1
-Support for kafka-sink
rajeshLovesToCode Aug 10, 2023
ffec16e
Merge branch 'opensearch-project:main' into kafka-sink
rajeshLovesToCode Aug 11, 2023
574c5f9
-Support for kafka-sink
rajeshLovesToCode Aug 11, 2023
3a73019
-Support for kafka-sink
rajeshLovesToCode Aug 11, 2023
6050fa6
-Support for kafka-sink
rajeshLovesToCode Aug 16, 2023
19da476
-Support for kafka-sink
rajeshLovesToCode Aug 21, 2023
d427f80
-Support for kafka-sink
rajeshLovesToCode Aug 21, 2023
c09e92a
-Support for kafka-sink
rajeshLovesToCode Aug 21, 2023
eb97d1c
-Support for kafka-sink
rajeshLovesToCode Aug 21, 2023
1490dcb
Merge branch 'opensearch-project:main' into kafka-sink
rajeshLovesToCode Aug 22, 2023
a8f6547
-Support for kafka-sink
rajeshLovesToCode Aug 23, 2023
fc32f7c
Merge remote-tracking branch 'origin/kafka-sink' into kafka-sink
rajeshLovesToCode Aug 23, 2023
aa0fe36
Merge branch 'opensearch-project:main' into kafka-sink
rajeshLovesToCode Aug 24, 2023
f979bbc
Merge branch 'opensearch-project:main' into kafka-sink
rajeshLovesToCode Aug 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 311 additions & 0 deletions data-prepper-plugins/kafka-plugins/README-sink.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ dependencies {
implementation 'org.apache.kafka:connect-json:3.4.0'
implementation 'com.github.fge:json-schema-validator:2.2.14'
implementation 'commons-collections:commons-collections:3.2.2'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:apache-client'


}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
package org.opensearch.dataprepper.plugins.kafka.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;

import java.util.stream.Stream;

/**
* * A helper class that helps to read user configuration values from
Expand Down Expand Up @@ -37,6 +43,51 @@ public class SchemaConfig {
@JsonProperty("basic_auth_credentials_source")
private String basicAuthCredentialsSource;

@JsonProperty("inline_schema")
private String inlineSchema;

@JsonProperty("schema_file_location")
private String schemaFileLocation;

@JsonProperty("s3_file_config")
private S3FileConfig s3FileConfig;

@JsonProperty("is_create")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what does is_create mean? create topic? should be renamed as per functionality

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes have been incorporated.

@NotNull
private Boolean isCreate;

public static class S3FileConfig {
@Valid
@Size(max = 0, message = "Schema from file is not supported.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all 3 messages needs to be updated as per the field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes have been incorporated.

@JsonProperty("bucket_name")
private String bucketName;

@Valid
@Size(max = 0, message = "Schema from file is not supported.")
@JsonProperty("file_key")
private String fileKey;

@Valid
@Size(max = 0, message = "Schema from file is not supported.")
@JsonProperty("region")
private String region;

public String getRegion() {
return region;
}

public S3FileConfig() {
}

public String getBucketName() {
return bucketName;
}

public String getFileKey() {
return fileKey;
}
}

public int getSessionTimeoutms() {
return sessionTimeoutms;
}
Expand Down Expand Up @@ -64,4 +115,29 @@ public String getSchemaRegistryApiKey() {
public String getSchemaRegistryApiSecret() {
return schemaRegistryApiSecret;
}


public String getInlineSchema() {
return inlineSchema;
}

public String getSchemaFileLocation() {
return schemaFileLocation;
}

public S3FileConfig getS3FileConfig() {
return s3FileConfig;
}

@AssertTrue(message = "Only one of Inline schema or Schema file location or S3 file config config must be specified")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minro: config repeated 2 times after S3 file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes have been incorporated.

public boolean hasOnlyOneConfig() {
if(isCreate) {
return Stream.of(inlineSchema, schemaFileLocation, s3FileConfig).filter(n -> n != null).count() == 1;
}
return true;
}

public Boolean isCreate() {
return isCreate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;

import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

import java.time.Duration;

/**
* * A helper class that helps to read consumer configuration values from
* pipelines.yaml
Expand All @@ -23,6 +23,7 @@ public class TopicConfig {
static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45);
static final int DEFAULT_MAX_RETRY_ATTEMPT = Integer.MAX_VALUE;
static final String DEFAULT_AUTO_OFFSET_RESET = "latest";

static final Duration DEFAULT_THREAD_WAITING_TIME = Duration.ofSeconds(5);
static final Duration DEFAULT_MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4);
static final Duration DEFAULT_BUFFER_TIMEOUT = Duration.ofSeconds(5);
Expand All @@ -38,6 +39,11 @@ public class TopicConfig {
static final Integer DEFAULT_NUM_OF_WORKERS = 2;
static final Duration DEFAULT_HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5);


private static final Integer NUM_OF_PARTITIONS = 3;
private static final Short REPLICATION_FACTOR = 1;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: rename NUM_OF_PARTITIONS to DEFAULT_NUM_OF_PARTITIONS
rename REPLICATION_FACTOR to DEFAULT_REPLICATION_FACTOR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes have been incorporated.


@JsonProperty("name")
@NotNull
@Valid
Expand All @@ -64,7 +70,7 @@ public class TopicConfig {
private Duration maxRetryDelay = DEFAULT_MAX_RETRY_DELAY;

@JsonProperty("serde_format")
private MessageFormat serdeFormat= MessageFormat.PLAINTEXT;
private MessageFormat serdeFormat = MessageFormat.PLAINTEXT;

@JsonProperty("auto_commit")
private Boolean autoCommit = DEFAULT_AUTO_COMMIT;
Expand Down Expand Up @@ -134,7 +140,16 @@ public class TopicConfig {
@JsonProperty("heart_beat_interval")
@Valid
@Size(min = 1)
private Duration heartBeatInterval= DEFAULT_HEART_BEAT_INTERVAL_DURATION;
private Duration heartBeatInterval = DEFAULT_HEART_BEAT_INTERVAL_DURATION;

@JsonProperty("is_create")
private Boolean isCreate=Boolean.FALSE;

@JsonProperty("number_of_partitions")
private Integer numberOfPartions = NUM_OF_PARTITIONS;

@JsonProperty("replication_factor")
private Short replicationFactor = REPLICATION_FACTOR;

public String getGroupId() {
return groupId;
Expand Down Expand Up @@ -288,8 +303,21 @@ public void setName(String name) {
this.name = name;
}


public KafkaKeyMode getKafkaKeyMode() {
return kafkaKeyMode;
}

public Boolean isCreate() {
return isCreate;
}

public Integer getNumberOfPartions() {
return numberOfPartions;
}

public Short getReplicationFactor() {
return replicationFactor;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
import com.github.fge.jsonschema.core.report.ProcessingReport;
import com.github.fge.jsonschema.main.JsonSchema;
import com.github.fge.jsonschema.main.JsonSchemaFactory;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -27,7 +26,7 @@
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.service.SchemaService;
import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.slf4j.Logger;
Expand All @@ -54,57 +53,62 @@ public class KafkaSinkProducer<T> {

private final DLQSink dlqSink;

private final CachedSchemaRegistryClient schemaRegistryClient;

private final Collection<EventHandle> bufferedEventHandles;

private final ExpressionEvaluator expressionEvaluator;

private final ObjectMapper objectMapper = new ObjectMapper();

private final String tagTargetKey;

private final ObjectMapper objectMapper = new ObjectMapper();
private final String topicName;

private final String serdeFormat;

private final SchemaService schemaService;


public KafkaSinkProducer(final Producer producer,
final KafkaSinkConfig kafkaSinkConfig,
final DLQSink dlqSink,
final CachedSchemaRegistryClient schemaRegistryClient,
final ExpressionEvaluator expressionEvaluator,
final String tagTargetKey) {
final String tagTargetKey
) {
this.producer = producer;
this.kafkaSinkConfig = kafkaSinkConfig;
this.dlqSink = dlqSink;
this.schemaRegistryClient = schemaRegistryClient;
bufferedEventHandles = new LinkedList<>();
this.expressionEvaluator = expressionEvaluator;
this.tagTargetKey = tagTargetKey;
this.topicName = ObjectUtils.isEmpty(kafkaSinkConfig.getTopic()) ? null : kafkaSinkConfig.getTopic().getName();
this.serdeFormat = ObjectUtils.isEmpty(kafkaSinkConfig.getSerdeFormat()) ? null : kafkaSinkConfig.getSerdeFormat();
schemaService = new SchemaService.SchemaServiceBuilder().getFetchSchemaService(topicName, kafkaSinkConfig.getSchemaConfig()).build();


}

public void produceRecords(final Record<Event> record) {
if (record.getData().getEventHandle() != null) {
bufferedEventHandles.add(record.getData().getEventHandle());
}
TopicConfig topic = kafkaSinkConfig.getTopic();
Event event = getEvent(record);
final String key = event.formatString(kafkaSinkConfig.getPartitionKey(), expressionEvaluator);
Object dataForDlq = event.toJsonString();
LOG.info("Producing record " + dataForDlq);
try {
final String serdeFormat = kafkaSinkConfig.getSerdeFormat();
if (MessageFormat.JSON.toString().equalsIgnoreCase(serdeFormat)) {
publishJsonMessage(record, topic, key, dataForDlq);
publishJsonMessage(record, key);
} else if (MessageFormat.AVRO.toString().equalsIgnoreCase(serdeFormat)) {
publishAvroMessage(record, topic, key, dataForDlq);
publishAvroMessage(record, key);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this check Why not serdeFormat == MessageFormat.AVRO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes have been incorporated.

} else {
publishPlaintextMessage(record, topic, key, dataForDlq);
publishPlaintextMessage(record, key);
}
} catch (Exception e) {
LOG.error("Error occured while publishing " + e.getMessage());
releaseEventHandles(false);
}

}

private Event getEvent(Record<Event> record) {
private Event getEvent(final Record<Event> record) {
Event event = record.getData();
try {
event = addTagsToEvent(event, tagTargetKey);
Expand All @@ -114,54 +118,57 @@ private Event getEvent(Record<Event> record) {
return event;
}

private void publishPlaintextMessage(Record<Event> record, TopicConfig topic, String key, Object dataForDlq) {
producer.send(new ProducerRecord(topic.getName(), key, record.getData().toJsonString()), callBack(dataForDlq));

private void publishPlaintextMessage(final Record<Event> record, final String key) {
send(topicName, key, record.getData().toJsonString());
}

private void publishAvroMessage(final Record<Event> record, final String key) {
final Schema avroSchema = schemaService.getSchema(topicName);
if (avroSchema == null) {
throw new RuntimeException("Schema definition is mandatory in case of type avro");
}
final GenericRecord genericRecord = getGenericRecord(record.getData(), avroSchema);
send(topicName, key, genericRecord);
}

private void publishAvroMessage(Record<Event> record, TopicConfig topic, String key, Object dataForDlq) throws RestClientException, IOException {
final String valueToParse = schemaRegistryClient.
getLatestSchemaMetadata(topic.getName() + "-value").getSchema();
final Schema schema = new Schema.Parser().parse(valueToParse);
final GenericRecord genericRecord = getGenericRecord(record.getData(), schema);
producer.send(new ProducerRecord(topic.getName(), key, genericRecord), callBack(dataForDlq));
private void send(final String topicName, final String key, final Object record) {
producer.send(new ProducerRecord(topicName, key, record), callBack(record));
}

private void publishJsonMessage(Record<Event> record, TopicConfig topic, String key, Object dataForDlq) throws IOException, RestClientException, ProcessingException {
private void publishJsonMessage(final Record<Event> record, final String key) throws IOException, ProcessingException {
final JsonNode dataNode = new ObjectMapper().convertValue(record.getData().toJsonString(), JsonNode.class);
if (validateJson(topic.getName(), dataForDlq)) {
producer.send(new ProducerRecord(topic.getName(), key, dataNode), callBack(dataForDlq));
if (validateJson(topicName, record.getData().toJsonString())) {
send(topicName, key, dataNode);
} else {
dlqSink.perform(dataForDlq, new RuntimeException("Invalid Json"));
dlqSink.perform(dataNode, new RuntimeException("Invalid Json"));
}
}

private Boolean validateJson(final String topicName, Object dataForDlq) throws IOException, RestClientException, ProcessingException {
if (schemaRegistryClient != null) {
final String schemaJson = schemaRegistryClient.
getLatestSchemaMetadata(topicName + "-value").getSchema();
return validateSchema(dataForDlq.toString(), schemaJson);
private Boolean validateJson(final String topicName, final String jsonData) throws IOException, ProcessingException {
final String schemaJson = schemaService.getValueToParse(topicName);
if (schemaJson != null) {
return validateSchema(jsonData, schemaJson);

} else {
return true;
}
}

private boolean validateSchema(final String jsonData, final String schemaJson) throws IOException, ProcessingException {
public boolean validateSchema(final String jsonData, final String schemaJson) throws IOException, ProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode schemaNode = objectMapper.readTree(schemaJson);
JsonNode dataNode = objectMapper.readTree(jsonData);
JsonSchemaFactory schemaFactory = JsonSchemaFactory.byDefault();
JsonSchema schema = schemaFactory.getJsonSchema(schemaNode);
ProcessingReport report = schema.validate(dataNode);
if (report.isSuccess()) {
return true;
} else {
return false;
}
return report != null ? report.isSuccess() : false;
}

private Callback callBack(final Object dataForDlq) {
return (metadata, exception) -> {
if (null != exception) {
LOG.error("Error occured while publishing " + exception.getMessage());
releaseEventHandles(false);
dlqSink.perform(dataForDlq, exception);
} else {
Expand All @@ -186,7 +193,7 @@ private void releaseEventHandles(final boolean result) {
bufferedEventHandles.clear();
}

private Event addTagsToEvent(Event event, String tagsTargetKey) throws JsonProcessingException {
private Event addTagsToEvent(final Event event, final String tagsTargetKey) throws JsonProcessingException {
String eventJsonString = event.jsonBuilder().includeTags(tagsTargetKey).toJsonString();
Map<String, Object> eventData = objectMapper.readValue(eventJsonString, new TypeReference<>() {
});
Expand Down
Loading
Loading