Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support flushing sinks and completing acknowledgements on shutdown #4740

Open
dlvenable opened this issue Jul 16, 2024 · 0 comments
Open

Support flushing sinks and completing acknowledgements on shutdown #4740

dlvenable opened this issue Jul 16, 2024 · 0 comments
Labels
enhancement New feature or request

Comments

@dlvenable
Copy link
Member

Is your feature request related to a problem? Please describe.

Data Prepper currently waits a period of time to flush the buffer on shutdown. The current logic is to wait for the entire buffer to drain or for the drain timeout to expire.

This logic does not account for end-to-end acknowledgements. If a sink is taking a while to send acknowledgements, but the buffer is empty, Data Prepper will think that the pipeline is ready for shutdown.

Because of this, Data Prepper may produce duplicate data when shutdown in the middle of reading an S3 object (e.g. half the file is sent to the sink, but we shutdown before the second half is completed).

Describe the solution you'd like

Update Data Prepper to track the acknowledgement sets for a give pipeline. Consider this when performing the shutdown to ensure that it is completed.

Describe alternatives you've considered (Optional)

None

Additional context

I was working toward a solution to #4575 which would allow the S3 source to continue to keep the message visibility open while the sink flushed. Then, I found that the sink doesn't wait at all.

I used this pipeline and a local hold_forever sink (see #4737) to demonstrate:

sqs-pipeline:
  workers: 2
  delay: 100
  source:
    s3:
      notification_type: sqs
      compression: gzip
      acknowledgments: true
      codec:
        csv:
          delimiter: ' '
      sqs:
        queue_url: QUEUE
      aws:
        region: us-east-2
        sts_role_arn: ROLE

  processor:

  sink:
    - hold_forever:
        output_frequency: 5s

data-prepper-config.yaml:

ssl: false
serverPort: 4900
processor_shutdown_timeout: 'PT5M'

Data Prepper shutdown immediately. It should wait 5 minutes because the hold_forever sink is not sending any acknowledgements.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant