Skip to content

Commit

Permalink
Updated Kafka README. Cleaned up configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Aug 10, 2023
1 parent 049896a commit 46c4957
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 171 deletions.
167 changes: 74 additions & 93 deletions data-prepper-plugins/kafka-plugins/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Kafka source

This source allows Data Prepper to use Kafka as source. This source reads records from one or more Kafka topics. It uses the consumer API provided by Kafka to read messages from the kafka broker.
This source allows Data Prepper to use Kafka as source. This source reads records from one or more Kafka topics. It uses the consumer API provided by Kafka to read messages from the kafka broker to create DataPrepper events for further processing by the Data Prepper pipeline.

## Basic Usage
The following pipeline configuration will read plain string messages from two configured Kafka topics `Topic1` and `Topic2`
Expand All @@ -19,85 +19,116 @@ kafka-pipeline:
```


## Configuration
## Configuration Options

### Options
* `bootstrap_servers` (Required when not using MSK) : It is a host/port to use for establishing the initial connection to the Kafka cluster. Multiple brokers can be configured. When using MSK as the Kafka cluster, bootstrap server information is obtained from the MSK using MSK ARN provided in the config.

- `bootstrap_servers` (Required when not using MSK) : It is a host/port to use for establishing the initial connection to the Kafka cluster. Multiple brokers can be configured. When using MSK as the Kafka cluster, bootstrap server information is obtained from the MSK using MSK ARN provided in the config.
* `topics` (Required) : List of topics to read the messages from. The maximum number of topics is 10. See [Topic Configuration](#topic_configuration) for details.

- `topics` (Required) : List of topics to read the messages from. The maximum number of topics should be 10.
* `schema` (Optional) : Schema Registry Configuration. See [Schema Registry Configuration](#schema_configuration) for details.

- `name` (Required) : This denotes the name of the topic, and it is a mandatory one. Multiple list can be configured and the maximum number of topic should be 10.
* `authentication` (Optional) : Authentication Configuration. See [Authentication Configuration](#authentication_configuration) for details.

- `workers` (Optional) : Number of multithreaded consumers associated with each topic. Defaults value `2`. Maximum value is 200.
* `encryption` (Optional) : Encryption configuration. See [Encryption Configuration](#encryption_configuration) for details.

- `auto_commit` (Optional) : If false, the consumer's offset will not be periodically committed in the background. Default value `false`.
* `aws` (Optional) : AWS configurations. See [AWS Configuration](#aws_configuration) for details.

- `commit_interval` (Optional) : The frequency in seconds that the consumer offsets are auto-committed to Kafka. Default value `5s`.
* `acknowledgments` (Optional) : Enables End-to-end acknowledgments. If set to `true`, sqs message is deleted only after all events from the sqs message are successfully acknowledged by all sinks. Default value `false`.

- `session_timeout` (Optional) : The timeout used to detect client failures when using Kafka's group management. It is used for the rebalance. Default value `45s`
* `acknowledgments_timeout` (Optional) : Maximum time to wait for the acknowledgements to be received. Default value is `30s`.

- `auto_offset_reset` (Optional) : Sets Kafka's `auto.offset.reset` option. Default value `latest`.
* `client_dns_lookup`: Sets Kafka's client.dns.lookup option. This is needed when DNS aliases are used. Default value is `default`.

- `thread_waiting_time` (Optional) : It is the time for thread to wait until other thread completes the task and signal it. Kafka consumer poll timeout value is set to half of `thread_waiting_time`. Default value `5s`
### <a name="topic_configuration">Topic Configuration</a>

- `max_record_fetch_time` (Optional) : maximum time to fetch the record from the topic.
Defaults to `4s`.
* `name` (Required) : This denotes the name of the topic, and it is a mandatory one. Multiple list can be configured and the maximum number of topic should be 10.

- `heart_beat_interval` (Optional) : The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Defaults to `1s`.
* `group_id` (Required) : Sets Kafka's group.id option.

- `buffer_default_timeout` (Optional) : The maximum time to write data to the buffer. Defaults to `1s`.
* `workers` (Optional) : Number of multithreaded consumers associated with each topic. Defaults value `2`. Maximum value is 200.

- `fetch_max_bytes` (Optional) : The maximum record batch size accepted by the broker.
Defaults to `52428800`.
* `serde_format` (Optional): Indicates the serialization and deserialization format of the messages in the topic. Default value is `plaintext`.

- `fetch_max_wait` (Optional) : The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement. Defaults to `500`.
* `auto_commit` (Optional) : If false, the consumer's offset will not be periodically committed in the background. Default value `false`.

- `fetch_min_bytes` (Optional) : The minimum amount of data the server should return for a fetch request. Defaults to `1`.
* `commit_interval` (Optional) : The frequency in seconds that the consumer offsets are committed to Kafka. Used to set Kafka's auto.commit.interval.ms option if `auto_commit` is enabled. Used as commit interval when auto commit is disabled. Default value `5s`.

- `retry_backoff` (Optional) : The amount of time to wait before attempting to retry a failed request to a given topic partition. Defaults to `5s`.
* `session_timeout` (Optional) : The timeout used to detect client failures when using Kafka's group management. It is used for the rebalance. Default value `45s`

- `max_poll_interval` (Optional) : The maximum delay between invocations of poll() when using consumer group management. Defaults to `1s`.
* `auto_offset_reset` (Optional) : Sets Kafka's `auto.offset.reset` option. Default value `latest`.

- `consumer_max_poll_records` (Optional) : The maximum number of records returned in a single call to poll(). Defaults to `1s`.
* `thread_waiting_time` (Optional) : It is the time for thread to wait until other thread completes the task and signal it. Kafka consumer poll timeout value is set to half of `thread_waiting_time`. Default value `5s`

* `max_partition_fetch_bytes` (Optional) : Sets Kafka's max.partition.fetch.bytes option. Default value `1048576` (1MB).

* `heart_beat_interval` (Optional) : The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Used to set Kafka's heartbeat.interval.ms option. Defaults to `1s`.

* `fetch_max_bytes` (Optional) : The maximum record batch size accepted by the broker. Sets Kafka's fetch.max.bytes option. Default value `52428800`.

* `fetch_max_wait` (Optional) : The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement. Sets Kafka's fetch.max.wait.ms option. Default value `500`.

* `fetch_min_bytes` (Optional) : The minimum amount of data the server should return for a fetch request. Sets Kafka's fetch.min.bytes option. Default value `1`.

* `retry_backoff` (Optional) : The amount of time to wait before attempting to retry a failed request to a given topic partition. Sets Kafka's retry.backoff.ms option. Default value `10s`.

* `reconnect_backoff` (Optional) : Sets Kafka's reconnect.backoff.ms option. Default value `10s`.

* `max_poll_interval` (Optional) : The maximum delay between invocations of poll() when using consumer group management. Sets Kafka's max.poll.interval.ms option. Defaults to `300s`.

* `consumer_max_poll_records` (Optional) : The maximum number of records returned in a single call to poll(). Sets Kafka's max.poll.records option. Defaults to `500`.

* `key_mode` (Optional) : This indicates how the key field of the kafka message be handled. Default value for this is `include_as_field` which means the key is included in the event as `kafka_key`, if `discard` mode is used, the key is entirely discarded. if `include_as_metadata` is used, the key is put in the event metadata.

### <a name="schema_configuration">Schema Configuration</a>

- `registry_url` (Optional) : Deserialize a record value from a bytearray into a String. Defaults to `org.apache.kafka.common.serialization.StringDeserializer`.
* `type` (Required) : Valid types are `glue` and `confluent`. `glue` should be specified when using AWS Glue Registry and `confluent` should be specified when using Confluent schema registry. When using `glue` registry, aws config options under the `aws` section are used.

The following config options are valid only for the Confluent Registry

* `registry_url` (Required) : Deserialize a record value from a bytearray into a String. Defaults to `org.apache.kafka.common.serialization.StringDeserializer`.

- `version` (Optional) : Deserialize a record key from a bytearray into a String. Defaults to `org.apache.kafka.common.serialization.StringDeserializer`.
* `version` (Required) : Deserialize a record key from a bytearray into a String. Defaults to `org.apache.kafka.common.serialization.StringDeserializer`.

### <a name="auth_configuration">Auth Configuration for SASL PLAINTEXT</a>
* `schema_registry_api_key` (Required) : Schema Registry API key. Used in the secure communication with the schema registry.

- `username` (Optional) : The username for the Plaintext authentication.
* `schema_registry_api_secret` (Required) : Schema Registry API secret. Used in the secure communication with the schema registry.

- `password` (Optional) : The password for the Plaintext authentication.
### <a name="authentication_configuration">Authentication Configuration</a>

### <a name="auth_configuration">OAuth Configuration for SASLOAUTH</a>
* `sasl` (Required) : SASL authentication configuration. See [SASL Configuration](#sasl_configuration) for details.

- `oauth_client_id`: It is the client id is the public identifier of your authorization server.
### <a name="sasl_configuration">SASL Configuration</a>

- `oauth_client_secret` : It is a secret known only to the application and the authorization server.
One of the following options is required.

- `oauth_login_server` : The URL of the OAuth server.(Eg: https://dev.okta.com)
* `plaintext` (Optional) : Plaintext configuration. See [Plaintext Configuration](#plaintext_configuration) for details.

- `oauth_login_endpoint`: The End point URL of the OAuth server.(Eg: /oauth2/default/v1/token)
* `aws_msk_iam` (Optional) : AWS MSK IAM configuration. This can take either `role` or `default` values. When `role` option is used, the `sts_role_arn` used in the `aws` config is used to assume the role. Default value is `default`.

- `oauth_login_grant_type` (Optional) : This grant type refers to the way an application gets an access token.
### <a name="plaintext_configuration">Plaintext Configuration</a>

- `oauth_login_scope` (Optional) : This scope limit an application's access to a user's account.
- `username` (Required) : A String of username to be used for Kafka cluster authentication.

- `oauth_introspect_server` (Optional) : The URL of the introspect server. Most of the cases it should be similar to the oauth_login_server URL (Eg:https://dev.okta.com)
- `password` (Required) : A String of password to be used for Kafka cluster authentication.

- `oauth_introspect_endpoint` (Optional) : The end point of the introspect server URL.(Eg: /oauth2/default/v1/introspect)
### <a name="encryption_configuration">Encryption Configuration</a>

- `oauth_sasl_mechanism` (Optional) : It describes the authentication mechanism.
* `type` (Optional) : Encryption Type. Default value is `ssl`. Use `none` to disable encryption.

- `oauth_security_protocol` (Optional) : It is the SASL security protocol like PLAINTEXT or SSL.
* `insecure` (Optional) : A boolean flag to turn off SSL certificate verification. If set to true, CA certificate verification will be turned off and insecure requests will be sent. Default to `false`.

- `oauth_sasl_login_callback_handler_class` (Optional) : It is the user defined or built in Kafka class to handle login and its callbeck.
### <a name="aws_configuration">AWS Configuration</a>

* `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
* `sts_role_arn` (Optional) : The AWS STS role to assume for requests to SQS. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).
* `msk` (Optional) : MSK configurations. See [MSK Configuration](#msk_configuration) for details.

### <a name="msk_configuration">MSK Configuration</a>

* `arn` (Required) : The MSK ARN to use.

* `broker_connection_type` (Optional) : type of connection to use with the MSK broker. Allowed values are `public`, `single_vpc` and `multi_vpc`. Default value is `single_vpc`.

- `oauth_jwks_endpoint_url` (Optional) : The absolute URL for the oauth token refresh.

## Integration Tests

Expand Down Expand Up @@ -142,56 +173,6 @@ bin/kafka-server-start.sh config/server.properties

This plugin is compatible with Java 11. See

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)

```
log-pipeline:
source:
kafka:
bootstrap_servers:
- 127.0.0.1:9093
topics:
- name: my-topic-1
workers: 10
autocommit: false
autocommit_interval: 5s
session_timeout: 45s
max_retry_delay: 1s
auto_offset_reset: earliest
thread_waiting_time: 1s
max_record_fetch_time: 4s
heart_beat_interval: 3s
buffer_default_timeout: 5s
fetch_max_bytes: 52428800
fetch_max_wait: 500
fetch_min_bytes: 1
retry_backoff: 100s
max_poll_interval: 300000s
consumer_max_poll_records: 500
- name: my-topic-2
workers: 10
schema:
registry_url: http://localhost:8081/
version: 1
authentication:
sasl_plaintext:
username: admin
password: admin-secret
sasl_oauth:
oauth_client_id: 0oa9wc21447Pc5vsV5d8
oauth_client_secret: aGmOfHqIEvBJGDxXAOOcatiE9PvsPgoEePx8IPPb
oauth_login_server: https://dev-1365.okta.com
oauth_login_endpoint: /oauth2/default/v1/token
oauth_login_grant_type: refresh_token
oauth_login_scope: kafka
oauth_introspect_server: https://dev-1365.okta.com
oauth_introspect_endpoint: /oauth2/default/v1/introspect
oauth_sasl_mechanism: OAUTHBEARER
oauth_security_protocol: SASL_PLAINTEXT
oauth_sasl_login_callback_handler_class: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
oauth_jwks_endpoint_url: https://dev-1365.okta.com/oauth2/default/v1/keys
sink:
- stdout:
* [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
* [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)

```
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public boolean hasOnlyOneConfig() {

}

/*
* For Future USE
public static class SslAuthConfig {
// TODO Add Support for SSL authentication types like
// one-way or two-way authentication
Expand All @@ -57,14 +59,16 @@ public SslAuthConfig() {
@JsonProperty("ssl")
private SslAuthConfig sslAuthConfig;
@Valid
@JsonProperty("sasl")
private SaslAuthConfig saslAuthConfig;

public SslAuthConfig getSslAuthConfig() {
return sslAuthConfig;
}
*/

@Valid
@JsonProperty("sasl")
private SaslAuthConfig saslAuthConfig;

public SaslAuthConfig getSaslAuthConfig() {
return saslAuthConfig;
}
Expand Down
Loading

0 comments on commit 46c4957

Please sign in to comment.