Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Kafka as a buffer #3322

Closed
JonahCalvo opened this issue Sep 11, 2023 · 5 comments
Closed

Use Kafka as a buffer #3322

JonahCalvo opened this issue Sep 11, 2023 · 5 comments
Assignees
Labels
enhancement New feature or request plugin - buffer A plugin for buffering incoming data
Milestone

Comments

@JonahCalvo
Copy link
Contributor

JonahCalvo commented Sep 11, 2023

Use-case

Currently, the only buffer available with Data Prepper is the bounding_blocking buffer, which stores events in memory. This can lead to data loss if a pipeline crashes, or the buffer overflows. A disk based buffer is required to prevent this data loss.

This proposal is to implement a Kafka buffer. Kafka offers robust buffering capabilities by persistently storing data on disk across multiple nodes, ensuring high availability and fault tolerance.

Basic Configuration

The buffer will:

  • write incoming bytes to Kafka
  • consume from Kafka
  • callback to pipeline source to deserialize bytes

Sample configuration

buffer:
  kafka_buffer:
    bootstrap_servers:
      - 127.0.0.1:9093
    acknowledgments: true
    topic:
      name: "pipeline-buffer"
      group_id: "kafka-group"
    workers: 2
    authentication:
      sasl_plaintext:
        username: admin
        password: admin-secret

The configuration will be similar to that of the Kafka source and sink. Notably, only one topic will be provided, and serde_format will not be configurable, as the buffer will read and write bytes. Attributes that were previously set for each topic, such as workers, will be made attributes of the plugin, rather than topic.

Detailed Process

  • Producer and Consumer Logic: Reuse the logic from Kafka Source/Sink for both writing to and reading from the Kafka buffer.
  • Data Writing: Sources will write data to the buffer in raw bytes format.
  • Deserialization: To optimize performance and avoid re-serializing events, sources will implement the RawByteHandler interface. This interface will include a deserializeBytes() function, which the Kafka buffer will callback to when reading data.
  • Compatibility: Only push-based sources will be compatible with the Kafka buffer. Pull-based sources, like S3, will not be supported. Incompatible configurations will trigger an error message during pipeline startup.

Encryption

The Kafka buffer will offer optional encryption via KMS:

  • Before writing to the buffer, the KMS GenerateDataKeyPair API will be invoked to obtain a data key pair.
  • The KMS Encrypt API will then encrypt the private key, which will be sent to Kafka alongside the encrypted data.
  • During data reading, the KMS Decrypt API will decrypt the private key, which will then decrypt the data.

Metrics

The Kafka buffer will incorporate the standard buffer metrics, as well as the metrics reported by Kafka Source/Sink:

  • numberOfPositiveAcknowledgements
  • numberOfNegativeAcknowledgements
  • numberOfRecordsFailedToParse
  • numberOfBufferSizeOverflows
  • numberOfPollAuthErrors
  • numberOfRecordsCommitted
  • numberOfRecordsConsumed
  • numberOfBytesConsumed
@dlvenable dlvenable added enhancement New feature or request plugin - buffer A plugin for buffering incoming data and removed untriaged labels Sep 15, 2023
@dlvenable
Copy link
Member

@JonahCalvo , This is an exciting feature. Thanks for putting this proposal together.

What is the workers property? The Data Prepper pipeline worker reads from the Buffer. And it can have multiple threads which are defined in the pipeline. So it seems that each request to the Buffer should be happening on one of those threads, not in another "worker" thread.

@dlvenable
Copy link
Member

@JonahCalvo, We can also simplify the buffer name. Use kafka instead of kafka_buffer.

@dlvenable
Copy link
Member

This feature is completed by quite a few PRs.

@dlvenable dlvenable modified the milestones: v2.7, v2.6 Nov 16, 2023
@canob
Copy link

canob commented Nov 29, 2023

Hi everyone,

Can somebody give us a working sample configuration of Kafka Buffer to test it? Because I tried with the example in the first post, but is not working, :-(

From my tests and reading the logs:

  • The correct name of the buffer plugin is "kafka", not "kafka_buffer".
  • The "acknowledgments" flag not exist.
  • the "topic" flag not exist, and the correct flag is "topics", and supposedly you need to define it as an array, but is not working with "topics": ["topicname"]

Thanks for your help!

@canob
Copy link

canob commented Dec 1, 2023

Hi everyone,

Can somebody give us a working sample configuration of Kafka Buffer to test it? Because I tried with the example in the first post, but is not working, :-(

From my tests and reading the logs:

  • The correct name of the buffer plugin is "kafka", not "kafka_buffer".
  • The "acknowledgments" flag not exist.
  • the "topic" flag not exist, and the correct flag is "topics", and supposedly you need to define it as an array, but is not working with "topics": ["topicname"]

Thanks for your help!

Answering myself, the correct way to config the Kafka Buffer is this one:

  buffer:
    kafka:
      bootstrap_servers:
        - redpanda-0:9092
      topics:
        - name: dns-ip-pipeline-buffer
          group_id: data-prepper-group

Regards!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request plugin - buffer A plugin for buffering incoming data
Projects
Archived in project
Development

No branches or pull requests

3 participants