Skip to content

Commit

Permalink
-Support for kafka-sink (#2999)
Browse files Browse the repository at this point in the history
* -Support for kafka-sink
Signed-off-by: rajeshLovesToCode <[email protected]>

* -Support for kafka-sink
Signed-off-by: rajeshLovesToCode <[email protected]>
  • Loading branch information
rajeshLovesToCode authored Aug 7, 2023
1 parent 259fea1 commit 0582788
Show file tree
Hide file tree
Showing 20 changed files with 2,271 additions and 3 deletions.
7 changes: 7 additions & 0 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ dependencies {
testImplementation 'org.apache.kafka:kafka-clients:3.4.0:test'
testImplementation 'org.apache.kafka:connect-json:3.4.0'
testImplementation('com.kjetland:mbknor-jackson-jsonschema_2.13:1.0.39')
implementation project(':data-prepper-plugins:failures-common')
testImplementation group: 'org.powermock', name: 'powermock-api-mockito2', version: '2.0.9'
implementation 'org.apache.kafka:connect-json:3.4.0'
implementation 'com.github.fge:json-schema-validator:2.2.14'
implementation 'commons-collections:commons-collections:3.2.2'


}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
package org.opensearch.dataprepper.plugins.kafka.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;

import java.util.stream.Stream;

Expand All @@ -27,6 +27,9 @@ public static class SaslAuthConfig {
@JsonProperty("aws_msk_iam")
private AwsIamAuthConfig awsIamAuthConfig;

@JsonProperty("ssl_endpoint_identification_algorithm")
private String sslEndpointAlgorithm;

public AwsIamAuthConfig getAwsIamAuthConfig() {
return awsIamAuthConfig;
}
Expand All @@ -39,14 +42,19 @@ public OAuthConfig getOAuthConfig() {
return oAuthConfig;
}

public String getSslEndpointAlgorithm() {
return sslEndpointAlgorithm;
}

@AssertTrue(message = "Only one of AwsIam or oAuth or PlainText auth config must be specified")
public boolean hasOnlyOneConfig() {
return Stream.of(awsIamAuthConfig, plainTextAuthConfig, oAuthConfig).filter(n -> n!=null).count() == 1;
return Stream.of(awsIamAuthConfig, plainTextAuthConfig, oAuthConfig).filter(n -> n != null).count() == 1;
}

}

public static class SslAuthConfig {

public static class SslAuthConfig {
// TODO Add Support for SSL authentication types like
// one-way or two-way authentication

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package org.opensearch.dataprepper.plugins.kafka.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.time.Duration;
import java.util.List;

public class KafkaProducerProperties {
private static final String DEFAULT_BYTE_CAPACITY = "50mb";

@JsonProperty("buffer_memory")
private String bufferMemory = DEFAULT_BYTE_CAPACITY;

@JsonProperty("compression_type")
private String compressionType;

@JsonProperty("retries")
private int retries;

@JsonProperty("batch_size")
private int batchSize;

@JsonProperty("client_dns_lookup")
private String clientDnsLookup;

@JsonProperty("client_id")
private String clientId;

@JsonProperty("connections_max_idle")
private Duration connectionsMaxIdleMs;

@JsonProperty("delivery_timeout")
private Duration deliveryTimeoutMs;

@JsonProperty("linger_ms")
private Long lingerMs;

@JsonProperty("max_block")
private Duration maxBlockMs;

@JsonProperty("max_request_size")
private int maxRequestSize;

@JsonProperty("partitioner_class")
private Class partitionerClass;

@JsonProperty("partitioner_ignore_keys")
private Boolean partitionerIgnoreKeys;

@JsonProperty("receive_buffer")
private String receiveBufferBytes=DEFAULT_BYTE_CAPACITY;

@JsonProperty("request_timeout")
private Duration requestTimeoutMs;

@JsonProperty("send_buffer")
private String sendBufferBytes=DEFAULT_BYTE_CAPACITY;

@JsonProperty("socket_connection_setup_timeout_max")
private Duration socketConnectionSetupMaxTimeout;

@JsonProperty("socket_connection_setup_timeout")
private Duration socketConnectionSetupTimeout;

@JsonProperty("acks")
private String acks;

@JsonProperty("enable_idempotence")
private Boolean enableIdempotence;

@JsonProperty("interceptor_classes")
private List interceptorClasses;

@JsonProperty("max_in_flight_requests_per_connection")
private int maxInFlightRequestsPerConnection;

@JsonProperty("metadata_max_age")
private Duration metadataMaxAgeMs;

@JsonProperty("metadata_max_idle")
private Duration metadataMaxIdleMs;

@JsonProperty("metric_reporters")
private List metricReporters;

@JsonProperty("metrics_num_samples")
private int metricsNumSamples;

@JsonProperty("metrics_recording_level")
private String metricsRecordingLevel;

@JsonProperty("metrics_sample_window")
private Duration metricsSampleWindowMs;

@JsonProperty("partitioner_adaptive_partitioning_enable")
private boolean partitionerAdaptivePartitioningEnable;

@JsonProperty("partitioner_availability_timeout")
private Duration partitionerAvailabilityTimeoutMs;

@JsonProperty("reconnect_backoff_max")
private Duration reconnectBackoffMaxMs;

@JsonProperty("reconnect_backoff")
private Duration reconnectBackoffMs;

@JsonProperty("retry_backoff")
private Duration retryBackoffMs;


public String getCompressionType() {
return compressionType;
}

public int getRetries() {
if (retries == 0) {
retries = 5;
}
return retries;
}

public int getBatchSize() {
return batchSize;
}

public String getClientDnsLookup() {
return clientDnsLookup;
}

public String getClientId() {
return clientId;
}


public Long getLingerMs() {
return lingerMs;
}


public int getMaxRequestSize() {
return maxRequestSize;
}

public Class getPartitionerClass() {
return partitionerClass;
}

public Boolean getPartitionerIgnoreKeys() {
return partitionerIgnoreKeys;
}


public String getAcks() {
return acks;
}

public Boolean getEnableIdempotence() {
return enableIdempotence;
}

public List getInterceptorClasses() {
return interceptorClasses;
}

public int getMaxInFlightRequestsPerConnection() {
return maxInFlightRequestsPerConnection;
}


public List getMetricReporters() {
return metricReporters;
}

public int getMetricsNumSamples() {
return metricsNumSamples;
}

public String getMetricsRecordingLevel() {
return metricsRecordingLevel;
}


public boolean isPartitionerAdaptivePartitioningEnable() {
return partitionerAdaptivePartitioningEnable;
}

public String getBufferMemory() {
return bufferMemory;
}

public Long getConnectionsMaxIdleMs() {
return connectionsMaxIdleMs.toMillis();
}

public Long getDeliveryTimeoutMs() {
return deliveryTimeoutMs.toMillis();
}

public Long getMaxBlockMs() {
return maxBlockMs.toMillis();
}

public String getReceiveBufferBytes() {
return receiveBufferBytes;
}

public Long getRequestTimeoutMs() {
return requestTimeoutMs.toMillis();
}

public String getSendBufferBytes() {
return sendBufferBytes;
}

public Long getSocketConnectionSetupMaxTimeout() {
return socketConnectionSetupMaxTimeout.toMillis();
}

public Long getSocketConnectionSetupTimeout() {
return socketConnectionSetupTimeout.toMillis();
}

public Long getMetadataMaxAgeMs() {
return metadataMaxAgeMs.toMillis();
}

public Long getMetadataMaxIdleMs() {
return metadataMaxIdleMs.toMillis();
}

public Long getMetricsSampleWindowMs() {
return metricsSampleWindowMs.toMillis();
}

public Long getPartitionerAvailabilityTimeoutMs() {
return partitionerAvailabilityTimeoutMs.toMillis();
}

public Long getReconnectBackoffMaxMs() {
return reconnectBackoffMaxMs.toMillis();
}

public Long getReconnectBackoffMs() {
return reconnectBackoffMs.toMillis();
}

public Long getRetryBackoffMs() {
return retryBackoffMs.toMillis();
}
}
Loading

0 comments on commit 0582788

Please sign in to comment.