Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] ChangeVisibilityTimeout call failure during pipeline shutdown. #4575

Closed
chenqi0805 opened this issue May 31, 2024 · 2 comments · Fixed by #4748
Closed

[BUG] ChangeVisibilityTimeout call failure during pipeline shutdown. #4575

chenqi0805 opened this issue May 31, 2024 · 2 comments · Fixed by #4748
Assignees
Labels
bug Something isn't working
Milestone

Comments

@chenqi0805
Copy link
Collaborator

Describe the bug
SqsClient get closed early when it is still making changeVisibilityTimeout call within acknowledgment callback thread:

2024-05-31T20:29:16.968 [acknowledgement-callback-10] ERROR org.opensearch.dataprepper.plugins.source.s3.SqsWorker - Failed to set visibility timeout for message xxxxxxxxxx to 60
java.lang.IllegalStateException: Connection pool shut down
	at org.apache.http.util.Asserts.check([Asserts.java:34](http://asserts.java:34/)) ~[Apache-HttpComponents-HttpCore-4.4.x.jar:?]
	at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection([PoolingHttpClientConnectionManager.java:269](http://poolinghttpclientconnectionmanager.java:269/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
	at software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection([ClientConnectionManagerFactory.java:75](http://clientconnectionmanagerfactory.java:75/)) ~[apache-client-2.23.3.jar:?]
	at software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection([ClientConnectionManagerFactory.java:57](http://clientconnectionmanagerfactory.java:57/)) ~[apache-client-2.23.3.jar:?]
	at org.apache.http.impl.execchain.MainClientExec.execute([MainClientExec.java:176](http://mainclientexec.java:176/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
	at org.apache.http.impl.execchain.ProtocolExec.execute([ProtocolExec.java:186](http://protocolexec.java:186/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
	at org.apache.http.impl.client.InternalHttpClient.doExecute([InternalHttpClient.java:185](http://internalhttpclient.java:185/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
	at org.apache.http.impl.client.CloseableHttpClient.execute([CloseableHttpClient.java:83](http://closeablehttpclient.java:83/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
	at org.apache.http.impl.client.CloseableHttpClient.execute([CloseableHttpClient.java:56](http://closeablehttpclient.java:56/)) ~[Apache-HttpComponents-HttpClient-4.5.x.jar:?]
	at software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute([ApacheSdkHttpClient.java:72](http://apachesdkhttpclient.java:72/)) ~[apache-client-2.23.3.jar:?]
	at software.amazon.awssdk.http.apache.ApacheHttpClient.execute([ApacheHttpClient.java:254](http://apachehttpclient.java:254/)) ~[apache-client-2.23.3.jar:?]
	at software.amazon.awssdk.http.apache.ApacheHttpClient.access$500([ApacheHttpClient.java:104](http://apachehttpclient.java:104/)) ~[apache-client-2.23.3.jar:?]
	at [software.amazon.awssdk.http.apache.ApacheHttpClient$1.call](http://software.amazon.awssdk.http.apache.apachehttpclient$1.call/)([ApacheHttpClient.java:231](http://apachehttpclient.java:231/)) ~[apache-client-2.23.3.jar:?]
	at [software.amazon.awssdk.http.apache.ApacheHttpClient$1.call](http://software.amazon.awssdk.http.apache.apachehttpclient$1.call/)([ApacheHttpClient.java:228](http://apachehttpclient.java:228/)) ~[apache-client-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe([MetricUtils.java:99](http://metricutils.java:99/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest([MakeHttpRequestStage.java:79](http://makehttprequeststage.java:79/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute([MakeHttpRequestStage.java:57](http://makehttprequeststage.java:57/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute([MakeHttpRequestStage.java:40](http://makehttprequeststage.java:40/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute([ApiCallAttemptTimeoutTrackingStage.java:72](http://apicallattempttimeouttrackingstage.java:72/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute([ApiCallAttemptTimeoutTrackingStage.java:42](http://apicallattempttimeouttrackingstage.java:42/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute([TimeoutExceptionHandlingStage.java:78](http://timeoutexceptionhandlingstage.java:78/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute([TimeoutExceptionHandlingStage.java:40](http://timeoutexceptionhandlingstage.java:40/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute([ApiCallAttemptMetricCollectionStage.java:55](http://apicallattemptmetriccollectionstage.java:55/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute([ApiCallAttemptMetricCollectionStage.java:39](http://apicallattemptmetriccollectionstage.java:39/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute([RetryableStage.java:81](http://retryablestage.java:81/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute([RetryableStage.java:36](http://retryablestage.java:36/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute([StreamManagingStage.java:56](http://streammanagingstage.java:56/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute([StreamManagingStage.java:36](http://streammanagingstage.java:36/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer([ApiCallTimeoutTrackingStage.java:80](http://apicalltimeouttrackingstage.java/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute([ApiCallTimeoutTrackingStage.java:60](http://apicalltimeouttrackingstage.java:60/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute([ApiCallTimeoutTrackingStage.java:42](http://apicalltimeouttrackingstage.java:42/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute([ApiCallMetricCollectionStage.java:50](http://apicallmetriccollectionstage.java:50/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute([ApiCallMetricCollectionStage.java:32](http://apicallmetriccollectionstage.java:32/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute([RequestPipelineBuilder.java:206](http://requestpipelinebuilder.java:206/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute([ExecutionFailureExceptionReportingStage.java:37](http://executionfailureexceptionreportingstage.java:37/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute([ExecutionFailureExceptionReportingStage.java:26](http://executionfailureexceptionreportingstage.java:26/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute([AmazonSyncHttpClient.java:224](http://amazonsynchttpclient.java:224/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke([BaseSyncClientHandler.java:103](http://basesyncclienthandler.java:103/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute([BaseSyncClientHandler.java:173](http://basesyncclienthandler.java:173/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1([BaseSyncClientHandler.java:80](http://basesyncclienthandler.java/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess([BaseSyncClientHandler.java:182](http://basesyncclienthandler.java:182/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute([BaseSyncClientHandler.java:74](http://basesyncclienthandler.java:74/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute([SdkSyncClientHandler.java:45](http://sdksyncclienthandler.java:45/)) ~[sdk-core-2.23.3.jar:?]
	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute([AwsSyncClientHandler.java:53](http://awssyncclienthandler.java:53/)) ~[aws-core-2.23.3.jar:?]
	at software.amazon.awssdk.services.sqs.DefaultSqsClient.changeMessageVisibility([DefaultSqsClient.java:544](http://defaultsqsclient.java:544/)) ~[sqs-2.23.3.jar:?]
	at org.opensearch.dataprepper.plugins.source.s3.SqsWorker.lambda$processS3EventNotificationRecords$1([SqsWorker.java:286](http://sqsworker.java:286/)) ~[s3-source-2.8.0-SNAPSHOT.jar:?]
	at org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSet.checkProgress([DefaultAcknowledgementSet.java:73](http://defaultacknowledgementset.java:73/)) ~[data-prepper-core-2.8.0-SNAPSHOT.jar:?]
	at java.base/java.util.concurrent.Executors$[RunnableAdapter.call](http://runnableadapter.call/)([Executors.java:515](http://executors.java:515/)) ~[?:?]
	at java.base/java.util.concurrent.FutureTask.runAndReset([FutureTask.java:305](http://futuretask.java:305/)) ~[?:?]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$[ScheduledFutureTask.run](http://scheduledfuturetask.run/)([ScheduledThreadPoolExecutor.java:305](http://scheduledthreadpoolexecutor.java:305/)) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker([ThreadPoolExecutor.java:1128](http://threadpoolexecutor.java:1128/)) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$[Worker.run](http://worker.run/)([ThreadPoolExecutor.java:628](http://threadpoolexecutor.java:628/)) ~[?:?]
	at java.base/java.lang.Thread.run([Thread.java:829](http://thread.java:829/)) [?:?]

To Reproduce
Steps to reproduce the behavior:

  1. setup an s3 pipeline with acknowledgment and visibility timeout
  2. shutdown the pipeline

Expected behavior
This error root cause (connection pool shutdown) should not appear

Additional context
Approaches:

  1. check shutdown flag when calling changing visibility timeout in acknowledgment thread. This approach will potentially increase duplicate messages during shutdown.
  2. shutdown sqs threads first; wait longer for all ack to be finished before shutting down the sqsClient and pipeline. This approach can mitigate duplication of messages during shutdown but extend the shutdown latency
@wanpdsantos
Copy link

I am having the same problem.

@dlvenable
Copy link
Member

I've been investigating this some. I created #4740 to provide a long-term solution which uses Data Prepper core to hold on acknowledgements. In the meantime, I'm going to make some more minor changes which avoid the error messages. But, I won't hold the source open to allow the acknowledgements to flush because that will require more core changes.

dlvenable added a commit to dlvenable/data-prepper that referenced this issue Jul 17, 2024
…peline from shutting down and no longer results in failures. Resolves opensearch-project#4575

The previous approach to shutting down the SQS thread closed the SqsClient. However, with acknowledgments enabled, asynchronous callbacks would result in further attempts to either ChangeVisibilityTimeout or DeleteMessages. These were failing because the client was closed. Also, the threads would remain and prevent Data Prepper from correctly shutting down. With this change, we correctly stop each processing thread. Then we close the client. Additionally, the SqsWorker now checks that it is not stopped before attempting to change the message visibility or delete messages.

Additionally, I found some missing test cases. Also, modifying this code and especially unit testing it is becoming more difficult, so I performed some refactoring to move message parsing out of the SqsWorker.

Signed-off-by: David Venable <[email protected]>
@github-project-automation github-project-automation bot moved this from Unplanned to Done in Data Prepper Tracking Board Jul 19, 2024
kkondaka pushed a commit to kkondaka/kk-data-prepper-f2 that referenced this issue Jul 23, 2024
…peline from shutting down and no longer results in failures. Resolves opensearch-project#4575 (opensearch-project#4748)

The previous approach to shutting down the SQS thread closed the SqsClient. However, with acknowledgments enabled, asynchronous callbacks would result in further attempts to either ChangeVisibilityTimeout or DeleteMessages. These were failing because the client was closed. Also, the threads would remain and prevent Data Prepper from correctly shutting down. With this change, we correctly stop each processing thread. Then we close the client. Additionally, the SqsWorker now checks that it is not stopped before attempting to change the message visibility or delete messages.

Additionally, I found some missing test cases. Also, modifying this code and especially unit testing it is becoming more difficult, so I performed some refactoring to move message parsing out of the SqsWorker.

Signed-off-by: David Venable <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
kkondaka pushed a commit to kkondaka/kk-data-prepper-f2 that referenced this issue Jul 23, 2024
…peline from shutting down and no longer results in failures. Resolves opensearch-project#4575 (opensearch-project#4748)

The previous approach to shutting down the SQS thread closed the SqsClient. However, with acknowledgments enabled, asynchronous callbacks would result in further attempts to either ChangeVisibilityTimeout or DeleteMessages. These were failing because the client was closed. Also, the threads would remain and prevent Data Prepper from correctly shutting down. With this change, we correctly stop each processing thread. Then we close the client. Additionally, the SqsWorker now checks that it is not stopped before attempting to change the message visibility or delete messages.

Additionally, I found some missing test cases. Also, modifying this code and especially unit testing it is becoming more difficult, so I performed some refactoring to move message parsing out of the SqsWorker.

Signed-off-by: David Venable <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
kkondaka pushed a commit to kkondaka/kk-data-prepper-f2 that referenced this issue Jul 23, 2024
…peline from shutting down and no longer results in failures. Resolves opensearch-project#4575 (opensearch-project#4748)

The previous approach to shutting down the SQS thread closed the SqsClient. However, with acknowledgments enabled, asynchronous callbacks would result in further attempts to either ChangeVisibilityTimeout or DeleteMessages. These were failing because the client was closed. Also, the threads would remain and prevent Data Prepper from correctly shutting down. With this change, we correctly stop each processing thread. Then we close the client. Additionally, the SqsWorker now checks that it is not stopped before attempting to change the message visibility or delete messages.

Additionally, I found some missing test cases. Also, modifying this code and especially unit testing it is becoming more difficult, so I performed some refactoring to move message parsing out of the SqsWorker.

Signed-off-by: David Venable <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
kkondaka pushed a commit to kkondaka/kk-data-prepper-f2 that referenced this issue Jul 30, 2024
…peline from shutting down and no longer results in failures. Resolves opensearch-project#4575 (opensearch-project#4748)

The previous approach to shutting down the SQS thread closed the SqsClient. However, with acknowledgments enabled, asynchronous callbacks would result in further attempts to either ChangeVisibilityTimeout or DeleteMessages. These were failing because the client was closed. Also, the threads would remain and prevent Data Prepper from correctly shutting down. With this change, we correctly stop each processing thread. Then we close the client. Additionally, the SqsWorker now checks that it is not stopped before attempting to change the message visibility or delete messages.

Additionally, I found some missing test cases. Also, modifying this code and especially unit testing it is becoming more difficult, so I performed some refactoring to move message parsing out of the SqsWorker.

Signed-off-by: David Venable <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
kkondaka pushed a commit to kkondaka/kk-data-prepper-f2 that referenced this issue Aug 8, 2024
…peline from shutting down and no longer results in failures. Resolves opensearch-project#4575 (opensearch-project#4748)

The previous approach to shutting down the SQS thread closed the SqsClient. However, with acknowledgments enabled, asynchronous callbacks would result in further attempts to either ChangeVisibilityTimeout or DeleteMessages. These were failing because the client was closed. Also, the threads would remain and prevent Data Prepper from correctly shutting down. With this change, we correctly stop each processing thread. Then we close the client. Additionally, the SqsWorker now checks that it is not stopped before attempting to change the message visibility or delete messages.

Additionally, I found some missing test cases. Also, modifying this code and especially unit testing it is becoming more difficult, so I performed some refactoring to move message parsing out of the SqsWorker.

Signed-off-by: David Venable <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
kkondaka pushed a commit to kkondaka/kk-data-prepper-f2 that referenced this issue Aug 12, 2024
…peline from shutting down and no longer results in failures. Resolves opensearch-project#4575 (opensearch-project#4748)

The previous approach to shutting down the SQS thread closed the SqsClient. However, with acknowledgments enabled, asynchronous callbacks would result in further attempts to either ChangeVisibilityTimeout or DeleteMessages. These were failing because the client was closed. Also, the threads would remain and prevent Data Prepper from correctly shutting down. With this change, we correctly stop each processing thread. Then we close the client. Additionally, the SqsWorker now checks that it is not stopped before attempting to change the message visibility or delete messages.

Additionally, I found some missing test cases. Also, modifying this code and especially unit testing it is becoming more difficult, so I performed some refactoring to move message parsing out of the SqsWorker.

Signed-off-by: David Venable <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
kkondaka pushed a commit to kkondaka/kk-data-prepper-f2 that referenced this issue Aug 14, 2024
…peline from shutting down and no longer results in failures. Resolves opensearch-project#4575 (opensearch-project#4748)

The previous approach to shutting down the SQS thread closed the SqsClient. However, with acknowledgments enabled, asynchronous callbacks would result in further attempts to either ChangeVisibilityTimeout or DeleteMessages. These were failing because the client was closed. Also, the threads would remain and prevent Data Prepper from correctly shutting down. With this change, we correctly stop each processing thread. Then we close the client. Additionally, the SqsWorker now checks that it is not stopped before attempting to change the message visibility or delete messages.

Additionally, I found some missing test cases. Also, modifying this code and especially unit testing it is becoming more difficult, so I performed some refactoring to move message parsing out of the SqsWorker.

Signed-off-by: David Venable <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Development

Successfully merging a pull request may close this issue.

3 participants