Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race in AzureClient factory fetch #16525

Merged
merged 3 commits into from
Jun 1, 2024
Merged

Conversation

cryptoe
Copy link
Contributor

@cryptoe cryptoe commented May 31, 2024

Found a regression in Azure connector due to #15287 which resulted in a race. When multiple threads are fetching the azureClient, the hashmap throws a ConcurrentModificationException

 UnknownError: java.util.ConcurrentModificationException
java.util.ConcurrentModificationException
	at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1135)
	at org.apache.druid.storage.azure.AzureClientFactory.getBlobServiceClient(AzureClientFactory.java:58)
	at org.apache.druid.storage.azure.AzureStorage.getBlobServiceClient(AzureStorage.java:269)
	at org.apache.druid.storage.azure.AzureStorage.getOrCreateBlobContainerClient(AzureStorage.java:303)
	at org.apache.druid.storage.azure.AzureStorage.getBlockBlobOutputStream(AzureStorage.java:131)
	at org.apache.druid.storage.azure.output.AzureStorageConnector.write(AzureStorageConnector.java:140)
	at org.apache.druid.msq.shuffle.output.DurableStorageTaskOutputChannelFactory.openChannel(DurableStorageTaskOutputChannelFactory.java:100)
	at org.apache.druid.msq.indexing.CountingOutputChannelFactory.openChannel(CountingOutputChannelFactory.java:47)
	at org.apache.druid.msq.exec.WorkerImpl$ShufflePipelineBuilder.lambda$mix$0(WorkerImpl.java:1452)
	at org.apache.druid.msq.exec.WorkerImpl$ShufflePipelineBuilder.lambda$push$13(WorkerImpl.java:1804)
	at org.apache.druid.msq.exec.WorkerImpl$ShufflePipelineBuilder$3.apply(WorkerImpl.java:1825)
	at org.apache.druid.msq.exec.WorkerImpl$ShufflePipelineBuilder$3.apply(WorkerImpl.java:1821)
	at com.google.common.util.concurrent.AbstractTransformFuture$AsyncTransformFuture.doTransform(AbstractTransformFuture.java:223)
	at com.google.common.util.concurrent.AbstractTransformFuture$AsyncTransformFuture.doTransform(AbstractTransformFuture.java:210)
	at com.google.common.util.concurrent.AbstractTransformFuture.run(AbstractTransformFuture.java:123)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.ImmediateFuture.addListener(ImmediateFuture.java:49)
	at com.google.common.util.concurrent.AbstractTransformFuture.create(AbstractTransformFuture.java:44)
	at com.google.common.util.concurrent.Futures.transformAsync(Futures.java:453)
	at org.apache.druid.msq.exec.WorkerImpl$ShufflePipelineBuilder.pushAsync(WorkerImpl.java:1818)
	at org.apache.druid.msq.exec.WorkerImpl$ShufflePipelineBuilder.push(WorkerImpl.java:1802)
	at org.apache.druid.msq.exec.WorkerImpl$ShufflePipelineBuilder.mix(WorkerImpl.java:1450)
	at org.apache.druid.msq.exec.WorkerImpl$RunWorkOrder.makeAndRunShuffleProcessors(WorkerImpl.java:1255)
	at org.apache.druid.msq.exec.WorkerImpl$RunWorkOrder.start(WorkerImpl.java:1078)
	at org.apache.druid.msq.exec.WorkerImpl$RunWorkOrder.access$100(WorkerImpl.java:1024)
	at org.apache.druid.msq.exec.WorkerImpl.runTask(WorkerImpl.java:412)
	at org.apache.druid.msq.exec.WorkerImpl.run(WorkerImpl.java:259)
	at org.apache.druid.msq.indexing.MSQWorkerTask.runTask(MSQWorkerTask.java:140)
	at org.apache.druid.indexing.common.task.AbstractTask.run(AbstractTask.java:179)
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:478)
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:450)
	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:75)
	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Fixed the issue using a ConcurrentHashMap and added a test which fails without this patch.

@cryptoe cryptoe added this to the 30.0.0 milestone May 31, 2024
@cryptoe cryptoe requested a review from georgew5656 May 31, 2024 13:16
@amaechler
Copy link
Contributor

Good catch @cryptoe. Were you able to reproduce this by simply configuring Azure for deep-storage plus ingesting from a blob storage, or was there a specific scenario that triggered this race condition?


final CountDownLatch latch = new CountDownLatch(threads);
ExecutorService executorService = Execs.multiThreaded(threads, "azure-client-fetcher-%d");
final AtomicReference<Exception> failureExecption = new AtomicReference<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final AtomicReference<Exception> failureExecption = new AtomicReference<>();
final AtomicReference<Exception> failureException = new AtomicReference<>();

@cryptoe
Copy link
Contributor Author

cryptoe commented Jun 1, 2024

@amaechler The race can be triggered whereever azureClient is used. So yes the scenario you mentioned can trigger this race.

@cryptoe cryptoe merged commit d091686 into apache:master Jun 1, 2024
54 checks passed
adarshsanjeev pushed a commit to adarshsanjeev/druid that referenced this pull request Jun 3, 2024
* Fix race in AzureClient factory fetch

* Fixing forbidden check.

* Renaming variable.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants