diff --git a/docs/development/extensions-core/azure.md b/docs/development/extensions-core/azure.md index c6a1c3979051..198ef3650dc1 100644 --- a/docs/development/extensions-core/azure.md +++ b/docs/development/extensions-core/azure.md @@ -33,8 +33,10 @@ To use this Apache Druid extension, [include](../../configuration/extensions.md# |--------|---------------|-----------|-------| |`druid.storage.type`|azure||Must be set.| |`druid.azure.account`||Azure Storage account name.|Must be set.| -|`druid.azure.key`||Azure Storage account key.|Optional. Either set key or sharedAccessStorageToken but not both.| -|`druid.azure.sharedAccessStorageToken`||Azure Shared Storage access token|Optional. Either set key or sharedAccessStorageToken but not both.| +|`druid.azure.key`||Azure Storage account key.|Optional. Set one of key, sharedAccessStorageToken or useAzureCredentialsChain.| +|`druid.azure.sharedAccessStorageToken`||Azure Shared Storage access token|Optional. Set one of key, sharedAccessStorageToken or useAzureCredentialsChain..| +|`druid.azure.useAzureCredentialsChain`|Use [DefaultAzureCredential](https://learn.microsoft.com/en-us/java/api/overview/azure/identity-readme?view=azure-java-stable) for authentication|Optional. Set one of key, sharedAccessStorageToken or useAzureCredentialsChain.|False| +|`druid.azure.managedIdentityClientId`|If you want to use managed identity authentication in the `DefaultAzureCredential`, `useAzureCredentialsChain` must be true.||Optional.| |`druid.azure.container`||Azure Storage container name.|Must be set.| |`druid.azure.prefix`|A prefix string that will be prepended to the blob names for the segments published to Azure deep storage| |""| |`druid.azure.protocol`|the protocol to use|http or https|https| diff --git a/extensions-core/azure-extensions/pom.xml b/extensions-core/azure-extensions/pom.xml index 0d49cd1867cb..33a9abbd0699 100644 --- a/extensions-core/azure-extensions/pom.xml +++ b/extensions-core/azure-extensions/pom.xml @@ -33,6 +33,17 @@ ../../pom.xml + + + + com.azure + azure-sdk-bom + 1.2.19 + pom + import + + + org.apache.druid @@ -40,29 +51,25 @@ ${project.parent.version} provided - - com.microsoft.azure - azure-storage - 8.6.0 - - - org.slf4j - slf4j-api - - - com.fasterxml.jackson.core - jackson-core - - - org.apache.commons - commons-lang3 - - - com.google.guava - guava - - + com.azure + azure-identity + + + com.azure + azure-storage-blob + + + com.azure + azure-storage-blob-batch + + + com.azure + azure-storage-common + + + com.azure + azure-core com.fasterxml.jackson.module @@ -129,7 +136,11 @@ commons-lang provided - + + org.apache.commons + commons-lang3 + provided + junit diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java index 02040d439156..8cb6ff149760 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java @@ -19,14 +19,14 @@ package org.apache.druid.data.input.azure; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.specialized.BlockBlobClient; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.CloudBlob; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.CloudObjectInputSource; @@ -42,7 +42,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.net.URI; -import java.net.URISyntaxException; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -150,7 +149,7 @@ public Iterator getDescriptorIteratorForPrefixes(List pre blob.getBlobLength() ); } - catch (URISyntaxException | StorageException e) { + catch (BlobStorageException e) { throw new RuntimeException(e); } } @@ -161,14 +160,14 @@ public Iterator getDescriptorIteratorForPrefixes(List pre public long getObjectSize(CloudObjectLocation location) { try { - final CloudBlob blobWithAttributes = storage.getBlockBlobReferenceWithAttributes( + final BlockBlobClient blobWithAttributes = storage.getBlockBlobReferenceWithAttributes( location.getBucket(), location.getPath() ); - return blobWithAttributes.getProperties().getLength(); + return blobWithAttributes.getProperties().getBlobSize(); } - catch (URISyntaxException | StorageException e) { + catch (BlobStorageException e) { throw new RuntimeException(e); } } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureAccountConfig.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureAccountConfig.java index 235ae6f3c609..5826c68f1b78 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureAccountConfig.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureAccountConfig.java @@ -46,6 +46,12 @@ public class AzureAccountConfig @JsonProperty private String sharedAccessStorageToken; + @JsonProperty + private String managedIdentityClientId; + + @JsonProperty + private Boolean useAzureCredentialsChain = Boolean.FALSE; + @SuppressWarnings("unused") // Used by Jackson deserialization? public void setProtocol(String protocol) { @@ -94,9 +100,25 @@ public String getSharedAccessStorageToken() return sharedAccessStorageToken; } + public Boolean getUseAzureCredentialsChain() + { + return useAzureCredentialsChain; + } + + public String getManagedIdentityClientId() + { + return managedIdentityClientId; + } + + @SuppressWarnings("unused") // Used by Jackson deserialization? public void setSharedAccessStorageToken(String sharedAccessStorageToken) { this.sharedAccessStorageToken = sharedAccessStorageToken; } + + public void setUseAzureCredentialsChain(Boolean useAzureCredentialsChain) + { + this.useAzureCredentialsChain = useAzureCredentialsChain; + } } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java index 91af1140cb5f..dfbf6cb33856 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java @@ -19,15 +19,14 @@ package org.apache.druid.storage.azure; +import com.azure.storage.blob.models.BlobStorageException; import com.google.common.io.ByteSource; import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.AssistedInject; -import com.microsoft.azure.storage.StorageException; import org.apache.druid.java.util.common.logger.Logger; import java.io.IOException; import java.io.InputStream; -import java.net.URISyntaxException; /** * Used for getting an {@link InputStream} to an azure resource. @@ -62,7 +61,7 @@ public InputStream openStream(long offset) throws IOException try { return azureStorage.getBlockBlobInputStream(offset, containerName, blobPath); } - catch (StorageException | URISyntaxException e) { + catch (BlobStorageException e) { if (AzureUtils.AZURE_RETRY.apply(e)) { throw new IOException("Recoverable exception", e); } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java new file mode 100644 index 000000000000..365b9d40dab1 --- /dev/null +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.azure; + +import com.azure.core.http.policy.ExponentialBackoffOptions; +import com.azure.core.http.policy.RetryOptions; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.common.StorageSharedKeyCredential; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +/** + * Factory class for generating BlobServiceClient objects. + */ +public class AzureClientFactory +{ + + private final AzureAccountConfig config; + private final Map cachedBlobServiceClients; + + public AzureClientFactory(AzureAccountConfig config) + { + this.config = config; + this.cachedBlobServiceClients = new HashMap<>(); + } + + // It's okay to store clients in a map here because all the configs for specifying azure retries are static, and there are only 2 of them. + // The 2 configs are AzureAccountConfig.maxTries and AzureOutputConfig.maxRetrr. + // We will only ever have at most 2 clients in cachedBlobServiceClients. + public BlobServiceClient getBlobServiceClient(Integer retryCount) + { + if (!cachedBlobServiceClients.containsKey(retryCount)) { + BlobServiceClientBuilder clientBuilder = getAuthenticatedBlobServiceClientBuilder() + .retryOptions(new RetryOptions( + new ExponentialBackoffOptions() + .setMaxRetries(retryCount != null ? retryCount : config.getMaxTries()) + .setBaseDelay(Duration.ofMillis(1000)) + .setMaxDelay(Duration.ofMillis(60000)) + )); + cachedBlobServiceClients.put(retryCount, clientBuilder.buildClient()); + } + + return cachedBlobServiceClients.get(retryCount); + } + + private BlobServiceClientBuilder getAuthenticatedBlobServiceClientBuilder() + { + BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder() + .endpoint("https://" + config.getAccount() + ".blob.core.windows.net"); + + if (config.getKey() != null) { + clientBuilder.credential(new StorageSharedKeyCredential(config.getAccount(), config.getKey())); + } else if (config.getSharedAccessStorageToken() != null) { + clientBuilder.sasToken(config.getSharedAccessStorageToken()); + } else if (config.getUseAzureCredentialsChain()) { + // We might not use the managed identity client id in the credential chain but we can just set it here and it will no-op. + DefaultAzureCredentialBuilder defaultAzureCredentialBuilder = new DefaultAzureCredentialBuilder() + .managedIdentityClientId(config.getManagedIdentityClientId()); + clientBuilder.credential(defaultAzureCredentialBuilder.build()); + } + return clientBuilder; + } +} diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java index c2a696c3d757..95ca3c08ae87 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java @@ -19,16 +19,12 @@ package org.apache.druid.storage.azure; +import com.azure.storage.blob.models.BlobItem; import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.AssistedInject; -import com.microsoft.azure.storage.ResultContinuation; -import com.microsoft.azure.storage.ResultSegment; -import com.microsoft.azure.storage.blob.ListBlobItem; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.storage.azure.blob.CloudBlobHolder; -import org.apache.druid.storage.azure.blob.ListBlobItemHolder; -import org.apache.druid.storage.azure.blob.ListBlobItemHolderFactory; import java.net.URI; import java.util.Iterator; @@ -42,36 +38,28 @@ public class AzureCloudBlobIterator implements Iterator { private static final Logger log = new Logger(AzureCloudBlobIterator.class); private final AzureStorage storage; - private final ListBlobItemHolderFactory blobItemDruidFactory; private final Iterator prefixesIterator; private final int maxListingLength; - - private ResultSegment result; private String currentContainer; private String currentPrefix; - private ResultContinuation continuationToken; private CloudBlobHolder currentBlobItem; - private Iterator blobItemIterator; + private Iterator blobItemIterator; private final AzureAccountConfig config; @AssistedInject AzureCloudBlobIterator( AzureStorage storage, - ListBlobItemHolderFactory blobItemDruidFactory, AzureAccountConfig config, @Assisted final Iterable prefixes, @Assisted final int maxListingLength ) { this.storage = storage; - this.blobItemDruidFactory = blobItemDruidFactory; this.config = config; this.prefixesIterator = prefixes.iterator(); this.maxListingLength = maxListingLength; - this.result = null; this.currentContainer = null; this.currentPrefix = null; - this.continuationToken = null; this.currentBlobItem = null; this.blobItemIterator = null; @@ -108,8 +96,6 @@ private void prepareNextRequest() log.debug("currentUri: %s\ncurrentContainer: %s\ncurrentPrefix: %s", currentUri, currentContainer, currentPrefix ); - result = null; - continuationToken = null; } private void fetchNextBatch() @@ -121,14 +107,13 @@ private void fetchNextBatch() currentContainer, currentPrefix ); - result = AzureUtils.retryAzureOperation(() -> storage.listBlobsWithPrefixInContainerSegmented( + // We don't need to iterate by page because the client handles this, it will fetch the next page when necessary. + blobItemIterator = storage.listBlobsWithPrefixInContainerSegmented( currentContainer, currentPrefix, - continuationToken, - maxListingLength - ), config.getMaxTries()); - continuationToken = result.getContinuationToken(); - blobItemIterator = result.getResults().iterator(); + maxListingLength, + config.getMaxTries() + ).stream().iterator(); } catch (Exception e) { throw new RE( @@ -146,19 +131,15 @@ private void fetchNextBatch() */ private void advanceBlobItem() { - while (blobItemIterator.hasNext() || continuationToken != null || prefixesIterator.hasNext()) { + while (prefixesIterator.hasNext() || blobItemIterator.hasNext()) { while (blobItemIterator.hasNext()) { - ListBlobItemHolder blobItem = blobItemDruidFactory.create(blobItemIterator.next()); - /* skip directory objects */ - if (blobItem.isCloudBlob() && blobItem.getCloudBlob().getBlobLength() > 0) { - currentBlobItem = blobItem.getCloudBlob(); + BlobItem blobItem = blobItemIterator.next(); + if (!blobItem.isPrefix() && blobItem.getProperties().getContentLength() > 0) { + currentBlobItem = new CloudBlobHolder(blobItem, currentContainer); return; } } - - if (continuationToken != null) { - fetchNextBatch(); - } else if (prefixesIterator.hasNext()) { + if (prefixesIterator.hasNext()) { prepareNextRequest(); fetchNextBatch(); } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java index bdfdb9937479..0b8ccf6daeea 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java @@ -19,9 +19,9 @@ package org.apache.druid.storage.azure; +import com.azure.storage.blob.models.BlobStorageException; import com.google.common.base.Predicates; import com.google.inject.Inject; -import com.microsoft.azure.storage.StorageException; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.java.util.common.logger.Logger; @@ -30,7 +30,6 @@ import org.apache.druid.timeline.DataSegment; import java.io.IOException; -import java.net.URISyntaxException; import java.nio.file.Paths; import java.util.Map; @@ -76,13 +75,8 @@ public void kill(DataSegment segment) throws SegmentLoadingException try { azureStorage.emptyCloudBlobDirectory(containerName, dirPath); } - catch (StorageException e) { - Object extendedInfo = - e.getExtendedErrorInformation() == null ? null : e.getExtendedErrorInformation().getErrorMessage(); - throw new SegmentLoadingException(e, "Couldn't kill segment[%s]: [%s]", segment.getId(), extendedInfo); - } - catch (URISyntaxException e) { - throw new SegmentLoadingException(e, "Couldn't kill segment[%s]: [%s]", segment.getId(), e.getReason()); + catch (BlobStorageException e) { + throw new SegmentLoadingException(e, "Couldn't kill segment[%s]: [%s]", segment.getId(), e.getMessage()); } } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java index 9f97256b1da8..e2ce8bbb88d5 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java @@ -19,11 +19,11 @@ package org.apache.druid.storage.azure; +import com.azure.storage.blob.models.BlobStorageException; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; -import com.microsoft.azure.storage.StorageException; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.SegmentUtils; @@ -35,7 +35,6 @@ import java.io.File; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import java.util.List; import java.util.Map; @@ -142,10 +141,7 @@ public DataSegment pushToPath(File indexFilesDir, DataSegment segment, String st final File outFile = zipOutFile = File.createTempFile("index", ".zip"); final long size = CompressionUtils.zip(indexFilesDir, zipOutFile); - return AzureUtils.retryAzureOperation( - () -> uploadDataSegment(segment, binaryVersion, size, outFile, azurePath), - accountConfig.getMaxTries() - ); + return uploadDataSegment(segment, binaryVersion, size, outFile, azurePath); } catch (Exception e) { throw new RuntimeException(e); @@ -181,9 +177,9 @@ DataSegment uploadDataSegment( final File compressedSegmentData, final String azurePath ) - throws StorageException, IOException, URISyntaxException + throws BlobStorageException, IOException { - azureStorage.uploadBlockBlob(compressedSegmentData, segmentConfig.getContainer(), azurePath); + azureStorage.uploadBlockBlob(compressedSegmentData, segmentConfig.getContainer(), azurePath, accountConfig.getMaxTries()); final DataSegment outSegment = segment .withSize(size) diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java index fc1a128e11e4..aa7bef718cfc 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java @@ -19,21 +19,24 @@ package org.apache.druid.storage.azure; +import com.azure.core.http.rest.PagedIterable; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.batch.BlobBatchClient; +import com.azure.storage.blob.batch.BlobBatchClientBuilder; +import com.azure.storage.blob.batch.BlobBatchStorageException; +import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.blob.models.BlobRange; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.DeleteSnapshotsOptionType; +import com.azure.storage.blob.models.ListBlobsOptions; +import com.azure.storage.blob.models.ParallelTransferOptions; +import com.azure.storage.blob.options.BlobInputStreamOptions; +import com.azure.storage.blob.options.BlockBlobOutputStreamOptions; +import com.azure.storage.blob.specialized.BlockBlobClient; +import com.azure.storage.common.Utility; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Supplier; -import com.microsoft.azure.storage.ResultContinuation; -import com.microsoft.azure.storage.ResultSegment; -import com.microsoft.azure.storage.RetryExponentialRetry; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.BlobDeleteBatchOperation; -import com.microsoft.azure.storage.blob.BlobListingDetails; -import com.microsoft.azure.storage.blob.BlobRequestOptions; -import com.microsoft.azure.storage.blob.CloudBlob; -import com.microsoft.azure.storage.blob.CloudBlobClient; -import com.microsoft.azure.storage.blob.CloudBlobContainer; -import com.microsoft.azure.storage.blob.CloudBlockBlob; -import com.microsoft.azure.storage.blob.DeleteSnapshotsOption; -import com.microsoft.azure.storage.blob.ListBlobItem; +import com.google.common.collect.Lists; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.logger.Logger; @@ -43,9 +46,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.net.URISyntaxException; +import java.time.Duration; import java.util.ArrayList; -import java.util.EnumSet; import java.util.List; /** @@ -53,58 +55,43 @@ */ public class AzureStorage { - private static final boolean USE_FLAT_BLOB_LISTING = true; // Default value from Azure library private static final int DELTA_BACKOFF_MS = 30_000; private static final Logger log = new Logger(AzureStorage.class); - /** - * Some segment processing tools such as DataSegmentKiller are initialized when an ingestion job starts - * if the extension is loaded, even when the implementation of DataSegmentKiller is not used. As a result, - * if we have a CloudBlobClient instead of a supplier of it, it can cause unnecessary config validation - * against Azure storage even when it's not used at all. To perform the config validation - * only when it is actually used, we use a supplier. - * - * See OmniDataSegmentKiller for how DataSegmentKillers are initialized. - */ - private final Supplier cloudBlobClient; + private final AzureClientFactory azureClientFactory; public AzureStorage( - Supplier cloudBlobClient + AzureClientFactory azureClientFactory ) { - this.cloudBlobClient = cloudBlobClient; + this.azureClientFactory = azureClientFactory; } public List emptyCloudBlobDirectory(final String containerName, final String virtualDirPath) - throws StorageException, URISyntaxException + throws BlobStorageException { return emptyCloudBlobDirectory(containerName, virtualDirPath, null); } public List emptyCloudBlobDirectory(final String containerName, final String virtualDirPath, final Integer maxAttempts) - throws StorageException, URISyntaxException + throws BlobStorageException { List deletedFiles = new ArrayList<>(); - CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName); - - Iterable blobItems = container.listBlobs( - virtualDirPath, - USE_FLAT_BLOB_LISTING, - null, - getRequestOptionsWithRetry(maxAttempts), - null - ); + BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); - for (ListBlobItem blobItem : blobItems) { - CloudBlob cloudBlob = (CloudBlob) blobItem; - log.debug("Removing file[%s] from Azure.", cloudBlob.getName()); - if (cloudBlob.deleteIfExists(DeleteSnapshotsOption.NONE, null, getRequestOptionsWithRetry(maxAttempts), null)) { - deletedFiles.add(cloudBlob.getName()); - } - } + // https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blobs-list The new client uses flat listing by default. + PagedIterable blobItems = blobContainerClient.listBlobs(new ListBlobsOptions().setPrefix(virtualDirPath), Duration.ofMillis(DELTA_BACKOFF_MS)); + + blobItems.iterableByPage().forEach(page -> { + page.getElements().forEach(blob -> { + if (blobContainerClient.getBlobClient(blob.getName()).deleteIfExists()) { + deletedFiles.add(blob.getName()); + } + }); + }); if (deletedFiles.isEmpty()) { log.warn("No files were deleted on the following Azure path: [%s]", virtualDirPath); @@ -113,12 +100,15 @@ public List emptyCloudBlobDirectory(final String containerName, final St return deletedFiles; } - public void uploadBlockBlob(final File file, final String containerName, final String blobPath) - throws IOException, StorageException, URISyntaxException + public void uploadBlockBlob(final File file, final String containerName, final String blobPath, final Integer maxAttempts) + throws IOException, BlobStorageException { - CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName); + BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); + try (FileInputStream stream = new FileInputStream(file)) { - container.getBlockBlobReference(blobPath).upload(stream, file.length()); + // By default this creates a Block blob, no need to use a specific Block blob client. + // We also need to urlEncode the path to handle special characters. + blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).upload(stream, file.length()); } } @@ -127,159 +117,123 @@ public OutputStream getBlockBlobOutputStream( final String blobPath, @Nullable final Integer streamWriteSizeBytes, Integer maxAttempts - ) throws URISyntaxException, StorageException + ) throws BlobStorageException { - CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName); - CloudBlockBlob blockBlobReference = container.getBlockBlobReference(blobPath); + BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); + BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).getBlockBlobClient(); - if (blockBlobReference.exists()) { + if (blockBlobClient.exists()) { throw new RE("Reference already exists"); } - + BlockBlobOutputStreamOptions options = new BlockBlobOutputStreamOptions(); if (streamWriteSizeBytes != null) { - blockBlobReference.setStreamWriteSizeInBytes(streamWriteSizeBytes); + options.setParallelTransferOptions(new ParallelTransferOptions().setBlockSizeLong(streamWriteSizeBytes.longValue())); } - - return blockBlobReference.openOutputStream(null, getRequestOptionsWithRetry(maxAttempts), null); - + return blockBlobClient.getBlobOutputStream(options); } - public CloudBlob getBlockBlobReferenceWithAttributes(final String containerName, final String blobPath) - throws URISyntaxException, StorageException + // There's no need to download attributes with the new azure clients, they will get fetched as needed. + public BlockBlobClient getBlockBlobReferenceWithAttributes(final String containerName, final String blobPath) + throws BlobStorageException { - final CloudBlockBlob blobReference = getOrCreateCloudBlobContainer(containerName).getBlockBlobReference(blobPath); - blobReference.downloadAttributes(); - return blobReference; + return getOrCreateBlobContainerClient(containerName).getBlobClient(Utility.urlEncode(blobPath)).getBlockBlobClient(); } public long getBlockBlobLength(final String containerName, final String blobPath) - throws URISyntaxException, StorageException + throws BlobStorageException { - return getBlockBlobReferenceWithAttributes(containerName, blobPath).getProperties().getLength(); + return getBlockBlobReferenceWithAttributes(containerName, blobPath).getProperties().getBlobSize(); } public InputStream getBlockBlobInputStream(final String containerName, final String blobPath) - throws URISyntaxException, StorageException + throws BlobStorageException { return getBlockBlobInputStream(0L, containerName, blobPath); } public InputStream getBlockBlobInputStream(long offset, final String containerName, final String blobPath) - throws URISyntaxException, StorageException + throws BlobStorageException { return getBlockBlobInputStream(offset, null, containerName, blobPath); } public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath) - throws URISyntaxException, StorageException + throws BlobStorageException { return getBlockBlobInputStream(offset, length, containerName, blobPath, null); } public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath, Integer maxAttempts) - throws URISyntaxException, StorageException + throws BlobStorageException { - CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName); - return container.getBlockBlobReference(blobPath) - .openInputStream(offset, length, null, getRequestOptionsWithRetry(maxAttempts), null); + BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); + return blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).openInputStream(new BlobInputStreamOptions().setRange(new BlobRange(offset, length))); } public void batchDeleteFiles(String containerName, Iterable paths, Integer maxAttempts) - throws URISyntaxException, StorageException + throws BlobBatchStorageException { - CloudBlobContainer cloudBlobContainer = getOrCreateCloudBlobContainer(containerName); - BlobDeleteBatchOperation blobDeleteBatchOperation = new BlobDeleteBatchOperation(); - for (String path : paths) { - CloudBlob blobReference = cloudBlobContainer.getBlockBlobReference(path); - blobDeleteBatchOperation.addSubOperation(blobReference); - } - cloudBlobClient.get().executeBatch(blobDeleteBatchOperation, getRequestOptionsWithRetry(maxAttempts), null); - } - public List listDir(final String containerName, final String virtualDirPath) - throws URISyntaxException, StorageException - { - return listDir(containerName, virtualDirPath, null); + BlobBatchClient blobBatchClient = new BlobBatchClientBuilder(getOrCreateBlobContainerClient(containerName, maxAttempts)).buildClient(); + blobBatchClient.deleteBlobs(Lists.newArrayList(paths), DeleteSnapshotsOptionType.ONLY); } public List listDir(final String containerName, final String virtualDirPath, final Integer maxAttempts) - throws StorageException, URISyntaxException + throws BlobStorageException { List files = new ArrayList<>(); - CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName); + BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); - for (ListBlobItem blobItem : - container.listBlobs(virtualDirPath, USE_FLAT_BLOB_LISTING, null, getRequestOptionsWithRetry(maxAttempts), null)) { - CloudBlob cloudBlob = (CloudBlob) blobItem; - files.add(cloudBlob.getName()); - } + PagedIterable blobItems = blobContainerClient.listBlobs( + new ListBlobsOptions().setPrefix(virtualDirPath), + Duration.ofMillis(DELTA_BACKOFF_MS) + ); + + blobItems.iterableByPage().forEach(page -> page.getElements().forEach(blob -> files.add(blob.getName()))); return files; } - public boolean getBlockBlobExists(String container, String blobPath) throws URISyntaxException, StorageException + public boolean getBlockBlobExists(String container, String blobPath) throws BlobStorageException { return getBlockBlobExists(container, blobPath, null); } public boolean getBlockBlobExists(String container, String blobPath, Integer maxAttempts) - throws URISyntaxException, StorageException - { - return getOrCreateCloudBlobContainer(container).getBlockBlobReference(blobPath) - .exists(null, getRequestOptionsWithRetry(maxAttempts), null); - } - - /** - * If maxAttempts is provided, this method returns request options with retry built in. - * Retry backoff is exponential backoff, with maxAttempts set to the one provided - */ - @Nullable - private BlobRequestOptions getRequestOptionsWithRetry(Integer maxAttempts) + throws BlobStorageException { - if (maxAttempts == null) { - return null; - } - BlobRequestOptions requestOptions = new BlobRequestOptions(); - requestOptions.setRetryPolicyFactory(new RetryExponentialRetry(DELTA_BACKOFF_MS, maxAttempts)); - return requestOptions; + return getOrCreateBlobContainerClient(container, maxAttempts).getBlobClient(Utility.urlEncode(blobPath)).exists(); } @VisibleForTesting - CloudBlobClient getCloudBlobClient() + BlobServiceClient getBlobServiceClient(Integer maxAttempts) { - return this.cloudBlobClient.get(); + return azureClientFactory.getBlobServiceClient(maxAttempts); } @VisibleForTesting - ResultSegment listBlobsWithPrefixInContainerSegmented( + PagedIterable listBlobsWithPrefixInContainerSegmented( final String containerName, final String prefix, - ResultContinuation continuationToken, - int maxResults - ) throws StorageException, URISyntaxException + int maxResults, + Integer maxAttempts + ) throws BlobStorageException { - CloudBlobContainer cloudBlobContainer = cloudBlobClient.get().getContainerReference(containerName); - return cloudBlobContainer - .listBlobsSegmented( - prefix, - /* Use flat blob listing here so that we get only blob types and not directories.*/ - USE_FLAT_BLOB_LISTING, - EnumSet - .noneOf(BlobListingDetails.class), - maxResults, - continuationToken, - null, - null - ); + BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); + return blobContainerClient.listBlobs( + new ListBlobsOptions().setPrefix(prefix).setMaxResultsPerPage(maxResults), + Duration.ofMillis(DELTA_BACKOFF_MS) + ); } - private CloudBlobContainer getOrCreateCloudBlobContainer(final String containerName) - throws StorageException, URISyntaxException + private BlobContainerClient getOrCreateBlobContainerClient(final String containerName) { - CloudBlobContainer cloudBlobContainer = cloudBlobClient.get().getContainerReference(containerName); - cloudBlobContainer.createIfNotExists(); + return getBlobServiceClient(null).createBlobContainerIfNotExists(containerName); + } - return cloudBlobContainer; + private BlobContainerClient getOrCreateBlobContainerClient(final String containerName, final Integer maxRetries) + { + return getBlobServiceClient(maxRetries).createBlobContainerIfNotExists(containerName); } } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java index 674e451de51a..ec4ed8fa201d 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java @@ -23,14 +23,12 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Provides; import com.google.inject.assistedinject.FactoryModuleBuilder; -import com.microsoft.azure.storage.CloudStorageAccount; -import com.microsoft.azure.storage.blob.CloudBlobClient; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.druid.data.input.azure.AzureEntityFactory; import org.apache.druid.data.input.azure.AzureInputSource; import org.apache.druid.guice.Binders; @@ -38,11 +36,7 @@ import org.apache.druid.guice.LazySingleton; import org.apache.druid.initialization.DruidModule; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.storage.azure.blob.ListBlobItemHolderFactory; -import java.net.URISyntaxException; -import java.security.InvalidKeyException; import java.util.List; /** @@ -114,64 +108,38 @@ public void configure(Binder binder) .build(AzureCloudBlobIteratorFactory.class)); binder.install(new FactoryModuleBuilder() .build(AzureCloudBlobIterableFactory.class)); - binder.install(new FactoryModuleBuilder() - .build(ListBlobItemHolderFactory.class)); } - /** - * Creates a supplier that lazily initialize {@link CloudBlobClient}. - * This is to avoid immediate config validation but defer it until you actually use the client. - */ + @Provides @LazySingleton - public Supplier getCloudBlobClient(final AzureAccountConfig config) + public AzureClientFactory getAzureClientFactory(final AzureAccountConfig config) { - if ((config.getKey() != null && config.getSharedAccessStorageToken() != null) - || - (config.getKey() == null && config.getSharedAccessStorageToken() == null)) { - throw new ISE("Either set 'key' or 'sharedAccessStorageToken' in the azure config but not both." - + " Please refer to azure documentation."); + if (StringUtils.isEmpty(config.getKey()) && StringUtils.isEmpty(config.getSharedAccessStorageToken()) && BooleanUtils.isNotTrue(config.getUseAzureCredentialsChain())) { + throw new ISE("Either set 'key' or 'sharedAccessStorageToken' or 'useAzureCredentialsChain' in the azure config." + + " Please refer to azure documentation."); + } + + /* Azure named keys and sas tokens are mutually exclusive with each other and with azure keychain auth, + but any form of auth supported by the DefaultAzureCredentialChain is not mutually exclusive, e.g. you can have + environment credentials or workload credentials or managed credentials using the same chain. + **/ + if (!StringUtils.isEmpty(config.getKey()) && !StringUtils.isEmpty(config.getSharedAccessStorageToken()) || + !StringUtils.isEmpty(config.getKey()) && BooleanUtils.isTrue(config.getUseAzureCredentialsChain()) || + !StringUtils.isEmpty(config.getSharedAccessStorageToken()) && BooleanUtils.isTrue(config.getUseAzureCredentialsChain()) + ) { + throw new ISE("Set only one of 'key' or 'sharedAccessStorageToken' or 'useAzureCredentialsChain' in the azure config." + + " Please refer to azure documentation."); } - return Suppliers.memoize(() -> { - try { - final CloudStorageAccount account; - if (config.getKey() != null) { - account = CloudStorageAccount.parse( - StringUtils.format( - STORAGE_CONNECTION_STRING_WITH_KEY, - config.getProtocol(), - config.getAccount(), - config.getKey() - ) - - ); - return account.createCloudBlobClient(); - } else if (config.getSharedAccessStorageToken() != null) { - account = CloudStorageAccount.parse(StringUtils.format( - STORAGE_CONNECTION_STRING_WITH_TOKEN, - config.getProtocol(), - config.getAccount(), - config.getSharedAccessStorageToken() - )); - return account.createCloudBlobClient(); - } else { - throw new ISE( - "None of 'key' or 'sharedAccessStorageToken' is set in the azure config." - + " Please refer to azure extension documentation."); - } - } - catch (URISyntaxException | InvalidKeyException e) { - throw new RuntimeException(e); - } - }); + return new AzureClientFactory(config); } @Provides @LazySingleton public AzureStorage getAzureStorageContainer( - final Supplier cloudBlobClient + final AzureClientFactory azureClientFactory ) { - return new AzureStorage(cloudBlobClient); + return new AzureStorage(azureClientFactory); } } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java index 5e6880c14ed2..edec449c43d9 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java @@ -19,9 +19,9 @@ package org.apache.druid.storage.azure; +import com.azure.storage.blob.models.BlobStorageException; import com.google.common.base.Optional; import com.google.inject.Inject; -import com.microsoft.azure.storage.StorageException; import org.apache.druid.common.utils.CurrentTimeMillisSupplier; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.StringUtils; @@ -31,7 +31,6 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.net.URISyntaxException; import java.util.Date; /** @@ -93,13 +92,7 @@ public void pushTaskStatus(String taskid, File statusFile) private void pushTaskFile(final File logFile, String taskKey) { try { - AzureUtils.retryAzureOperation( - () -> { - azureStorage.uploadBlockBlob(logFile, config.getContainer(), taskKey); - return null; - }, - config.getMaxTries() - ); + azureStorage.uploadBlockBlob(logFile, config.getContainer(), taskKey, accountConfig.getMaxTries()); } catch (Exception e) { throw new RuntimeException(e); @@ -153,7 +146,7 @@ private Optional streamTaskFile(final String taskid, final long off throw new IOException(e); } } - catch (StorageException | URISyntaxException e) { + catch (BlobStorageException e) { throw new IOE(e, "Failed to stream logs from: %s", taskKey); } } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogsConfig.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogsConfig.java index ea27fdc3e564..2214e043deba 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogsConfig.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogsConfig.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; -import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; /** @@ -37,19 +36,14 @@ public class AzureTaskLogsConfig @NotNull private String prefix = null; - @JsonProperty - @Min(1) - private int maxTries = 3; - public AzureTaskLogsConfig() { } - public AzureTaskLogsConfig(String container, String prefix, int maxTries) + public AzureTaskLogsConfig(String container, String prefix) { this.container = container; this.prefix = prefix; - this.maxTries = maxTries; } public String getContainer() @@ -61,9 +55,4 @@ public String getPrefix() { return prefix; } - - public int getMaxTries() - { - return maxTries; - } } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java index 63322404f08b..5309b016d5e5 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java @@ -19,20 +19,18 @@ package org.apache.druid.storage.azure; +import com.azure.storage.blob.models.BlobStorageException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; -import com.microsoft.azure.storage.StorageException; import org.apache.druid.data.input.impl.CloudObjectLocation; -import org.apache.druid.java.util.common.RetryUtils; -import org.apache.druid.java.util.common.RetryUtils.Task; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.azure.blob.CloudBlobHolder; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import java.util.Iterator; +import java.util.concurrent.TimeoutException; /** * Utility class for miscellaneous things involving Azure. @@ -48,20 +46,23 @@ public class AzureUtils // (from https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-use-blob-storage) static final String AZURE_STORAGE_HADOOP_PROTOCOL = "wasbs"; + // This logic is copied from RequestRetryOptions in the azure client. We still need this logic because some classes like + // RetryingInputEntity need a predicate function to tell whether to retry, seperate from the Azure client retries. public static final Predicate AZURE_RETRY = e -> { if (e == null) { return false; } for (Throwable t = e; t != null; t = t.getCause()) { - if (t instanceof URISyntaxException) { - return false; + if (t instanceof BlobStorageException) { + int statusCode = ((BlobStorageException) t).getStatusCode(); + return statusCode == 429 || statusCode == 500 || statusCode == 503; } - if (t instanceof StorageException) { + if (t instanceof IOException) { return true; } - if (t instanceof IOException) { + if (t instanceof TimeoutException) { return true; } } @@ -119,7 +120,6 @@ public static void deleteObjectsInPath( String prefix, Predicate filter ) - throws Exception { AzureCloudBlobIterable azureCloudBlobIterable = azureCloudBlobIterableFactory.create(ImmutableList.of(new CloudObjectLocation( @@ -131,26 +131,8 @@ public static void deleteObjectsInPath( while (iterator.hasNext()) { final CloudBlobHolder nextObject = iterator.next(); if (filter.apply(nextObject)) { - deleteBucketKeys(storage, accountConfig.getMaxTries(), nextObject.getContainerName(), nextObject.getName()); + storage.emptyCloudBlobDirectory(nextObject.getContainerName(), nextObject.getName(), accountConfig.getMaxTries()); } } } - - private static void deleteBucketKeys( - AzureStorage storage, - int maxTries, - String bucket, - String prefix - ) throws Exception - { - AzureUtils.retryAzureOperation(() -> { - storage.emptyCloudBlobDirectory(bucket, prefix); - return null; - }, maxTries); - } - - static T retryAzureOperation(Task f, int maxTries) throws Exception - { - return RetryUtils.retry(f, AZURE_RETRY, maxTries); - } } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java index 3391c3c7dd19..d2398aebda1f 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java @@ -19,28 +19,27 @@ package org.apache.druid.storage.azure.blob; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.CloudBlob; - -import java.net.URISyntaxException; +import com.azure.storage.blob.models.BlobItem; import java.util.Date; /** - * Wrapper for {@link CloudBlob}. Used to make testing easier, since {@link CloudBlob} + * Wrapper for {@link BlobItem}. Used to make testing easier, since {@link BlobItem} * is a final class and so is difficult to mock in unit tests. */ public class CloudBlobHolder { - private final CloudBlob delegate; + private final BlobItem delegate; + private final String container; - public CloudBlobHolder(CloudBlob delegate) + public CloudBlobHolder(BlobItem delegate, String container) { this.delegate = delegate; + this.container = container; } - public String getContainerName() throws URISyntaxException, StorageException + public String getContainerName() { - return delegate.getContainer().getName(); + return container; } public String getName() @@ -50,11 +49,11 @@ public String getName() public long getBlobLength() { - return delegate.getProperties().getLength(); + return delegate.getProperties().getContentLength(); } public Date getLastModifed() { - return delegate.getProperties().getLastModified(); + return Date.from(delegate.getProperties().getLastModified().toInstant()); } } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/ListBlobItemHolder.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/ListBlobItemHolder.java deleted file mode 100644 index ee31f6cf193a..000000000000 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/ListBlobItemHolder.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.storage.azure.blob; - -import com.google.inject.assistedinject.Assisted; -import com.google.inject.assistedinject.AssistedInject; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.CloudBlob; -import com.microsoft.azure.storage.blob.ListBlobItem; - -import java.net.URI; -import java.net.URISyntaxException; - -/** - * Wrapper class for {@link ListBlobItem} interface, which was missing some useful - * functionality for telling whether the blob was a cloudBlob or not. This class was - * added mainly to make testing easier. - */ -public class ListBlobItemHolder -{ - private final ListBlobItem delegate; - - @AssistedInject - public ListBlobItemHolder(@Assisted ListBlobItem delegate) - { - this.delegate = delegate; - } - - public String getContainerName() throws URISyntaxException, StorageException - { - return delegate.getContainer().getName(); - } - - public URI getUri() - { - return delegate.getUri(); - } - - public CloudBlobHolder getCloudBlob() - { - return new CloudBlobHolder((CloudBlob) delegate); - } - - public boolean isCloudBlob() - { - return delegate instanceof CloudBlob; - } - - @Override - public String toString() - { - return delegate.toString(); - } -} diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/ListBlobItemHolderFactory.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/ListBlobItemHolderFactory.java deleted file mode 100644 index a3533f72fcce..000000000000 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/ListBlobItemHolderFactory.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.storage.azure.blob; - -import com.microsoft.azure.storage.blob.ListBlobItem; - -/** - * Factory for creating {@link ListBlobItemHolder} objects - */ -public interface ListBlobItemHolderFactory -{ - ListBlobItemHolder create(ListBlobItem blobItem); -} diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java index 657043797e03..cd93c80ba148 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java @@ -19,9 +19,9 @@ package org.apache.druid.storage.azure.output; +import com.azure.storage.blob.models.BlobStorageException; import com.google.common.base.Joiner; import com.google.common.collect.Iterables; -import com.microsoft.azure.storage.StorageException; import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction; import org.apache.druid.storage.azure.AzureStorage; import org.apache.druid.storage.azure.AzureUtils; @@ -31,7 +31,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.net.URISyntaxException; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -64,7 +63,7 @@ public ChunkingStorageConnectorParameters buildInputParams(Stri try { return buildInputParams(path, 0, azureStorage.getBlockBlobLength(config.getContainer(), objectPath(path))); } - catch (URISyntaxException | StorageException e) { + catch (BlobStorageException e) { throw new IOException(e); } } @@ -100,7 +99,7 @@ public InputStream open(AzureInputRange inputRange) throws IOException config.getMaxRetry() ); } - catch (URISyntaxException | StorageException e) { + catch (BlobStorageException e) { throw new IOException(e); } } @@ -128,7 +127,7 @@ public boolean pathExists(String path) throws IOException try { return azureStorage.getBlockBlobExists(config.getContainer(), objectPath(path), config.getMaxRetry()); } - catch (URISyntaxException | StorageException e) { + catch (BlobStorageException e) { throw new IOException(e); } } @@ -144,7 +143,7 @@ public OutputStream write(String path) throws IOException config.getMaxRetry() ); } - catch (URISyntaxException | StorageException e) { + catch (BlobStorageException e) { throw new IOException(e); } } @@ -159,7 +158,7 @@ public void deleteFile(String path) throws IOException config.getMaxRetry() ); } - catch (URISyntaxException | StorageException e) { + catch (BlobStorageException e) { throw new IOException(e); } } @@ -174,7 +173,7 @@ public void deleteFiles(Iterable paths) throws IOException config.getMaxRetry() ); } - catch (StorageException | URISyntaxException e) { + catch (BlobStorageException e) { throw new IOException(e); } } @@ -185,7 +184,7 @@ public void deleteRecursively(String path) throws IOException try { azureStorage.emptyCloudBlobDirectory(config.getContainer(), objectPath(path), config.getMaxRetry()); } - catch (StorageException | URISyntaxException e) { + catch (BlobStorageException e) { throw new IOException(e); } } @@ -198,7 +197,7 @@ public Iterator listDir(String dirName) throws IOException try { paths = azureStorage.listDir(config.getContainer(), prefixBasePath, config.getMaxRetry()); } - catch (StorageException | URISyntaxException e) { + catch (BlobStorageException e) { throw new IOException(e); } diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java index 51518855cfc6..692c8dfeebad 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java @@ -154,7 +154,7 @@ public void test_createEntity_returnsExpectedEntity() } @Test - public void test_createSplits_successfullyCreatesCloudLocation_returnsExpectedLocations() throws Exception + public void test_createSplits_successfullyCreatesCloudLocation_returnsExpectedLocations() { List prefixes = ImmutableList.of(PREFIX_URI); List> expectedCloudLocations = ImmutableList.of(ImmutableList.of(CLOUD_OBJECT_LOCATION_1)); @@ -194,7 +194,6 @@ public void test_createSplits_successfullyCreatesCloudLocation_returnsExpectedLo @Test public void test_getPrefixesSplitStream_withObjectGlob_successfullyCreatesCloudLocation_returnsExpectedLocations() - throws Exception { List prefixes = ImmutableList.of(PREFIX_URI); List> expectedCloudLocations = ImmutableList.of(ImmutableList.of(CLOUD_OBJECT_LOCATION_1)); @@ -372,6 +371,7 @@ public void abidesEqualsContract() EqualsVerifier.forClass(AzureInputSource.class) .usingGetClass() .withPrefabValues(Logger.class, new Logger(AzureStorage.class), new Logger(AzureStorage.class)) + .withPrefabValues(AzureStorage.class, new AzureStorage(null), new AzureStorage(null)) .withNonnullFields("storage") .withNonnullFields("entityFactory") .withNonnullFields("azureCloudBlobIterableFactory") diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureByteSourceTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureByteSourceTest.java index f54ef2e40361..f8c7af2470a0 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureByteSourceTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureByteSourceTest.java @@ -19,14 +19,14 @@ package org.apache.druid.storage.azure; -import com.microsoft.azure.storage.StorageException; +import com.azure.core.http.HttpResponse; +import com.azure.storage.blob.models.BlobStorageException; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.junit.Test; import java.io.IOException; import java.io.InputStream; -import java.net.URISyntaxException; public class AzureByteSourceTest extends EasyMockSupport { @@ -34,7 +34,7 @@ public class AzureByteSourceTest extends EasyMockSupport private static final long OFFSET = 10L; @Test - public void test_openStream_withoutOffset_succeeds() throws IOException, URISyntaxException, StorageException + public void test_openStream_withoutOffset_succeeds() throws IOException, BlobStorageException { final String containerName = "container"; final String blobPath = "/path/to/file"; @@ -53,7 +53,7 @@ public void test_openStream_withoutOffset_succeeds() throws IOException, URISynt } @Test - public void test_openStream_withOffset_succeeds() throws IOException, URISyntaxException, StorageException + public void test_openStream_withOffset_succeeds() throws IOException, BlobStorageException { final String containerName = "container"; final String blobPath = "/path/to/file"; @@ -72,23 +72,23 @@ public void test_openStream_withOffset_succeeds() throws IOException, URISyntaxE } @Test(expected = IOException.class) - public void openStreamWithRecoverableErrorTest() throws URISyntaxException, StorageException, IOException + public void openStreamWithRecoverableErrorTest() throws BlobStorageException, IOException { final String containerName = "container"; final String blobPath = "/path/to/file"; AzureStorage azureStorage = createMock(AzureStorage.class); - + HttpResponse httpResponse = createMock(HttpResponse.class); + EasyMock.expect(httpResponse.getStatusCode()).andReturn(500).anyTimes(); + EasyMock.replay(httpResponse); EasyMock.expect(azureStorage.getBlockBlobInputStream(NO_OFFSET, containerName, blobPath)).andThrow( - new StorageException( - "", + new BlobStorageException( "", - 500, - null, + httpResponse, null ) ); - replayAll(); + EasyMock.replay(azureStorage); AzureByteSource byteSource = new AzureByteSource(azureStorage, containerName, blobPath); diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureClientFactoryTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureClientFactoryTest.java new file mode 100644 index 000000000000..ffc4a8bb8013 --- /dev/null +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureClientFactoryTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.azure; + +import com.azure.core.http.policy.AzureSasCredentialPolicy; +import com.azure.core.http.policy.BearerTokenAuthenticationPolicy; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.common.StorageSharedKeyCredential; +import com.google.common.collect.ImmutableMap; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.net.MalformedURLException; +import java.net.URL; + +public class AzureClientFactoryTest +{ + private AzureClientFactory azureClientFactory; + private static final String ACCOUNT = "account"; + + @Test + public void test_blobServiceClient_accountName() + { + AzureAccountConfig config = new AzureAccountConfig(); + azureClientFactory = new AzureClientFactory(config); + config.setAccount(ACCOUNT); + BlobServiceClient blobServiceClient = azureClientFactory.getBlobServiceClient(null); + Assert.assertEquals(ACCOUNT, blobServiceClient.getAccountName()); + } + + @Test + public void test_blobServiceClientBuilder_key() throws MalformedURLException + { + AzureAccountConfig config = new AzureAccountConfig(); + config.setKey("key"); + config.setAccount(ACCOUNT); + azureClientFactory = new AzureClientFactory(config); + BlobServiceClient blobServiceClient = azureClientFactory.getBlobServiceClient(null); + StorageSharedKeyCredential storageSharedKeyCredential = StorageSharedKeyCredential.getSharedKeyCredentialFromPipeline( + blobServiceClient.getHttpPipeline() + ); + Assert.assertNotNull(storageSharedKeyCredential); + + // Azure doesn't let us look at the key in the StorageSharedKeyCredential so make sure the authorization header generated is what we expect. + Assert.assertEquals( + new StorageSharedKeyCredential(ACCOUNT, "key").generateAuthorizationHeader(new URL("http://druid.com"), "POST", ImmutableMap.of()), + storageSharedKeyCredential.generateAuthorizationHeader(new URL("http://druid.com"), "POST", ImmutableMap.of()) + ); + } + + @Test + public void test_blobServiceClientBuilder_sasToken() + { + AzureAccountConfig config = new AzureAccountConfig(); + config.setSharedAccessStorageToken("sasToken"); + config.setAccount(ACCOUNT); + azureClientFactory = new AzureClientFactory(config); + BlobServiceClient blobServiceClient = azureClientFactory.getBlobServiceClient(null); + AzureSasCredentialPolicy azureSasCredentialPolicy = null; + for (int i = 0; i < blobServiceClient.getHttpPipeline().getPolicyCount(); i++) { + if (blobServiceClient.getHttpPipeline().getPolicy(i) instanceof AzureSasCredentialPolicy) { + azureSasCredentialPolicy = (AzureSasCredentialPolicy) blobServiceClient.getHttpPipeline().getPolicy(i); + } + } + + Assert.assertNotNull(azureSasCredentialPolicy); + } + + @Test + public void test_blobServiceClientBuilder_useDefaultCredentialChain() + { + AzureAccountConfig config = new AzureAccountConfig(); + config.setUseAzureCredentialsChain(true); + config.setAccount(ACCOUNT); + azureClientFactory = new AzureClientFactory(config); + BlobServiceClient blobServiceClient = azureClientFactory.getBlobServiceClient(null); + BearerTokenAuthenticationPolicy bearerTokenAuthenticationPolicy = null; + for (int i = 0; i < blobServiceClient.getHttpPipeline().getPolicyCount(); i++) { + if (blobServiceClient.getHttpPipeline().getPolicy(i) instanceof BearerTokenAuthenticationPolicy) { + bearerTokenAuthenticationPolicy = (BearerTokenAuthenticationPolicy) blobServiceClient.getHttpPipeline().getPolicy(i); + } + } + + Assert.assertNotNull(bearerTokenAuthenticationPolicy); + } + + @Test + public void test_blobServiceClientBuilder_useCachedClient() + { + AzureAccountConfig config = new AzureAccountConfig(); + config.setUseAzureCredentialsChain(true); + config.setAccount(ACCOUNT); + azureClientFactory = new AzureClientFactory(config); + BlobServiceClient blobServiceClient = azureClientFactory.getBlobServiceClient(null); + BlobServiceClient blobServiceClient2 = azureClientFactory.getBlobServiceClient(null); + Assert.assertEquals(blobServiceClient, blobServiceClient2); + } + + @Test + public void test_blobServiceClientBuilder_useNewClientForDifferentRetryCount() + { + AzureAccountConfig config = new AzureAccountConfig(); + config.setUseAzureCredentialsChain(true); + config.setAccount(ACCOUNT); + azureClientFactory = new AzureClientFactory(config); + BlobServiceClient blobServiceClient = azureClientFactory.getBlobServiceClient(null); + BlobServiceClient blobServiceClient2 = azureClientFactory.getBlobServiceClient(1); + Assert.assertNotEquals(blobServiceClient, blobServiceClient2); + } + + @Test + public void test_blobServiceClientBuilder_useAzureAccountConfig_asDefaultMaxTries() + { + AzureAccountConfig config = EasyMock.createMock(AzureAccountConfig.class); + EasyMock.expect(config.getKey()).andReturn("key").times(2); + EasyMock.expect(config.getAccount()).andReturn(ACCOUNT).times(2); + EasyMock.expect(config.getMaxTries()).andReturn(3); + azureClientFactory = new AzureClientFactory(config); + EasyMock.replay(config); + azureClientFactory.getBlobServiceClient(null); + EasyMock.verify(config); + } +} diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java index 22bbdba158b1..d66c3c211566 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java @@ -19,132 +19,46 @@ package org.apache.druid.storage.azure; +import com.azure.core.http.rest.PagedIterable; +import com.azure.core.http.rest.PagedResponse; +import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.blob.models.BlobItemProperties; +import com.azure.storage.blob.models.BlobStorageException; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.microsoft.azure.storage.ResultContinuation; -import com.microsoft.azure.storage.ResultSegment; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.ListBlobItem; +import org.apache.druid.common.guava.SettableSupplier; import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.azure.blob.CloudBlobHolder; -import org.apache.druid.storage.azure.blob.ListBlobItemHolder; -import org.apache.druid.storage.azure.blob.ListBlobItemHolderFactory; import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; -import org.junit.After; +import org.easymock.Mock; import org.junit.Assert; import org.junit.Before; import org.junit.Test; - -import java.io.IOException; +import org.junit.runner.RunWith; import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; +import java.util.stream.Collectors; +@RunWith(EasyMockRunner.class) public class AzureCloudBlobIteratorTest extends EasyMockSupport { - private static final String AZURE = "azure"; - private static final String CONTAINER1 = "container1"; - private static final String PREFIX_ONLY_CLOUD_BLOBS = "prefixOnlyCloudBlobs"; - private static final String PREFIX_WITH_NO_BLOBS = "prefixWithNoBlobs"; - private static final String PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES = "prefixWithCloudBlobsAndDirectories"; - private static final URI PREFIX_ONLY_CLOUD_BLOBS_URI; - private static final URI PREFIX_WITH_NO_BLOBS_URI; - private static final URI PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES_URI; - private static final List EMPTY_URI_PREFIXES = ImmutableList.of(); - private static final List PREFIXES; - private static final int MAX_LISTING_LENGTH = 10; - private static final int MAX_TRIES = 2; - private static final StorageException RETRYABLE_EXCEPTION = new StorageException("", "", new IOException()); - private static final URISyntaxException NON_RETRYABLE_EXCEPTION = new URISyntaxException("", ""); - + @Mock private AzureStorage storage; - private ListBlobItemHolderFactory blobItemDruidFactory; - private AzureAccountConfig config; - private ResultSegment resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1; - private ResultSegment resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs2; - private ResultSegment resultSegmentPrefixWithNoBlobs; - private ResultSegment resultSegmentPrefixWithCloudBlobsAndDirectories; - - private ResultContinuation resultContinuationPrefixOnlyCloudBlobs = new ResultContinuation(); - private ResultContinuation nullResultContinuationToken = null; - - private ListBlobItem blobItemPrefixWithOnlyCloudBlobs1; - private ListBlobItemHolder cloudBlobItemPrefixWithOnlyCloudBlobs1; - private CloudBlobHolder cloudBlobDruidPrefixWithOnlyCloudBlobs1; - - private ListBlobItem blobItemPrefixWithOnlyCloudBlobs2; - private ListBlobItemHolder cloudBlobItemPrefixWithOnlyCloudBlobs2; - private CloudBlobHolder cloudBlobDruidPrefixWithOnlyCloudBlobs2; - - private ListBlobItem blobItemPrefixWithCloudBlobsAndDirectories1; - private ListBlobItemHolder directoryItemPrefixWithCloudBlobsAndDirectories; - - private ListBlobItem blobItemPrefixWithCloudBlobsAndDirectories2; - private ListBlobItemHolder cloudBlobItemPrefixWithCloudBlobsAndDirectories; - private CloudBlobHolder cloudBlobDruidPrefixWithCloudBlobsAndDirectories; - - private ListBlobItem blobItemPrefixWithCloudBlobsAndDirectories3; - private ListBlobItemHolder directoryItemPrefixWithCloudBlobsAndDirectories3; - private AzureCloudBlobIterator azureCloudBlobIterator; - - static { - try { - PREFIX_ONLY_CLOUD_BLOBS_URI = new URI(AZURE + "://" + CONTAINER1 + "/" + PREFIX_ONLY_CLOUD_BLOBS); - PREFIX_WITH_NO_BLOBS_URI = new URI(AZURE + "://" + CONTAINER1 + "/" + PREFIX_WITH_NO_BLOBS); - PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES_URI = new URI(AZURE - + "://" - + CONTAINER1 - + "/" - + PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES); - PREFIXES = ImmutableList.of( - PREFIX_ONLY_CLOUD_BLOBS_URI, - PREFIX_WITH_NO_BLOBS_URI, - PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES_URI - ); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } + private final AzureAccountConfig config = new AzureAccountConfig(); + private final Integer MAX_TRIES = 3; + private final Integer MAX_LISTING_LENGTH = 10; + private final String CONTAINER = "container"; @Before public void setup() { - storage = createMock(AzureStorage.class); - config = createMock(AzureAccountConfig.class); - blobItemDruidFactory = createMock(ListBlobItemHolderFactory.class); - - resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1 = createMock(ResultSegment.class); - resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs2 = createMock(ResultSegment.class); - resultSegmentPrefixWithNoBlobs = createMock(ResultSegment.class); - resultSegmentPrefixWithCloudBlobsAndDirectories = createMock(ResultSegment.class); - cloudBlobItemPrefixWithOnlyCloudBlobs1 = createMock(ListBlobItemHolder.class); - - blobItemPrefixWithOnlyCloudBlobs1 = createMock(ListBlobItem.class); - cloudBlobItemPrefixWithOnlyCloudBlobs1 = createMock(ListBlobItemHolder.class); - cloudBlobDruidPrefixWithOnlyCloudBlobs1 = createMock(CloudBlobHolder.class); - EasyMock.expect(cloudBlobDruidPrefixWithOnlyCloudBlobs1.getBlobLength()).andReturn(10L).anyTimes(); - - blobItemPrefixWithOnlyCloudBlobs2 = createMock(ListBlobItem.class); - cloudBlobItemPrefixWithOnlyCloudBlobs2 = createMock(ListBlobItemHolder.class); - cloudBlobDruidPrefixWithOnlyCloudBlobs2 = createMock(CloudBlobHolder.class); - EasyMock.expect(cloudBlobDruidPrefixWithOnlyCloudBlobs2.getBlobLength()).andReturn(10L).anyTimes(); - - blobItemPrefixWithCloudBlobsAndDirectories1 = createMock(ListBlobItem.class); - directoryItemPrefixWithCloudBlobsAndDirectories = createMock(ListBlobItemHolder.class); - - blobItemPrefixWithCloudBlobsAndDirectories2 = createMock(ListBlobItem.class); - cloudBlobItemPrefixWithCloudBlobsAndDirectories = createMock(ListBlobItemHolder.class); - cloudBlobDruidPrefixWithCloudBlobsAndDirectories = createMock(CloudBlobHolder.class); - EasyMock.expect(cloudBlobDruidPrefixWithCloudBlobsAndDirectories.getBlobLength()).andReturn(10L).anyTimes(); - - blobItemPrefixWithCloudBlobsAndDirectories3 = createMock(ListBlobItem.class); - directoryItemPrefixWithCloudBlobsAndDirectories3 = createMock(ListBlobItemHolder.class); + config.setMaxTries(MAX_TRIES); } @Test @@ -152,10 +66,9 @@ public void test_hasNext_noBlobs_returnsFalse() { azureCloudBlobIterator = new AzureCloudBlobIterator( storage, - blobItemDruidFactory, config, - EMPTY_URI_PREFIXES, - MAX_LISTING_LENGTH + ImmutableList.of(), + 1 ); boolean hasNext = azureCloudBlobIterator.hasNext(); Assert.assertFalse(hasNext); @@ -164,168 +77,83 @@ public void test_hasNext_noBlobs_returnsFalse() @Test public void test_next_prefixesWithMultipleBlobsAndSomeDirectories_returnsExpectedBlobs() throws Exception { - EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); - EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.isCloudBlob()).andReturn(true); - EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.getCloudBlob()).andReturn( - cloudBlobDruidPrefixWithOnlyCloudBlobs1).anyTimes(); - EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs1)).andReturn( - cloudBlobItemPrefixWithOnlyCloudBlobs1); - - EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs2.isCloudBlob()).andReturn(true); - EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs2.getCloudBlob()).andReturn( - cloudBlobDruidPrefixWithOnlyCloudBlobs2).anyTimes(); - EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs2)).andReturn( - cloudBlobItemPrefixWithOnlyCloudBlobs2); - - EasyMock.expect(directoryItemPrefixWithCloudBlobsAndDirectories.isCloudBlob()).andReturn(false); - EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithCloudBlobsAndDirectories1)).andReturn( - directoryItemPrefixWithCloudBlobsAndDirectories); - - EasyMock.expect(cloudBlobItemPrefixWithCloudBlobsAndDirectories.isCloudBlob()).andReturn(true); - EasyMock.expect(cloudBlobItemPrefixWithCloudBlobsAndDirectories.getCloudBlob()).andReturn( - cloudBlobDruidPrefixWithCloudBlobsAndDirectories).anyTimes(); - EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithCloudBlobsAndDirectories2)).andReturn( - cloudBlobItemPrefixWithCloudBlobsAndDirectories); - - EasyMock.expect(directoryItemPrefixWithCloudBlobsAndDirectories3.isCloudBlob()).andReturn(false); - EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithCloudBlobsAndDirectories3)).andReturn( - directoryItemPrefixWithCloudBlobsAndDirectories3); - - ArrayList resultBlobItemsPrefixWithOnlyCloudBlobs1 = new ArrayList<>(); - resultBlobItemsPrefixWithOnlyCloudBlobs1.add(blobItemPrefixWithOnlyCloudBlobs1); - ArrayList resultBlobItemsPrefixWithOnlyCloudBlobs2 = new ArrayList<>(); - resultBlobItemsPrefixWithOnlyCloudBlobs2.add(blobItemPrefixWithOnlyCloudBlobs2); - ArrayList resultBlobItemsPrefixWithNoBlobs = new ArrayList<>(); - ArrayList resultBlobItemsPrefixWithCloudBlobsAndDirectories = new ArrayList<>(); - resultBlobItemsPrefixWithCloudBlobsAndDirectories.add(blobItemPrefixWithCloudBlobsAndDirectories1); - resultBlobItemsPrefixWithCloudBlobsAndDirectories.add(blobItemPrefixWithCloudBlobsAndDirectories2); - resultBlobItemsPrefixWithCloudBlobsAndDirectories.add(blobItemPrefixWithCloudBlobsAndDirectories3); - EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getContinuationToken()) - .andReturn(resultContinuationPrefixOnlyCloudBlobs); - EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getResults()) - .andReturn(resultBlobItemsPrefixWithOnlyCloudBlobs1); - - EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs2.getContinuationToken()).andReturn(nullResultContinuationToken); - EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs2.getResults()) - .andReturn(resultBlobItemsPrefixWithOnlyCloudBlobs2); - - EasyMock.expect(resultSegmentPrefixWithNoBlobs.getContinuationToken()).andReturn(nullResultContinuationToken); - EasyMock.expect(resultSegmentPrefixWithNoBlobs.getResults()).andReturn(resultBlobItemsPrefixWithNoBlobs); - - EasyMock.expect(resultSegmentPrefixWithCloudBlobsAndDirectories.getContinuationToken()) - .andReturn(nullResultContinuationToken); - EasyMock.expect(resultSegmentPrefixWithCloudBlobsAndDirectories.getResults()) - .andReturn(resultBlobItemsPrefixWithCloudBlobsAndDirectories); - - EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented( - CONTAINER1, - PREFIX_ONLY_CLOUD_BLOBS, - nullResultContinuationToken, - MAX_LISTING_LENGTH - )).andThrow(RETRYABLE_EXCEPTION); - - EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented( - CONTAINER1, - PREFIX_ONLY_CLOUD_BLOBS, - nullResultContinuationToken, - MAX_LISTING_LENGTH - )).andReturn(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1); - - - EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented( - CONTAINER1, - PREFIX_ONLY_CLOUD_BLOBS, - resultContinuationPrefixOnlyCloudBlobs, - MAX_LISTING_LENGTH - )).andReturn(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs2); - - EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented( - CONTAINER1, - PREFIX_WITH_NO_BLOBS, - nullResultContinuationToken, - MAX_LISTING_LENGTH - )).andReturn(resultSegmentPrefixWithNoBlobs); + List prefixes = ImmutableList.of( + new URI(StringUtils.format("azure://%s/dir1", CONTAINER)), + new URI(StringUtils.format("azure://%s/dir2", CONTAINER)) + ); - EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented( - CONTAINER1, - PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES, - nullResultContinuationToken, - MAX_LISTING_LENGTH - )).andReturn(resultSegmentPrefixWithCloudBlobsAndDirectories); + BlobItem blobItem = new BlobItem().setName("blobName").setProperties(new BlobItemProperties().setContentLength(10L)); + SettableSupplier> supplier = new SettableSupplier<>(); + supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem))); + PagedIterable pagedIterable = new PagedIterable<>(supplier); + EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(CONTAINER, "dir1", MAX_LISTING_LENGTH, MAX_TRIES)) + .andReturn(pagedIterable); + + BlobItem blobPrefixItem = new BlobItem().setIsPrefix(true).setName("subdir").setProperties(new BlobItemProperties()); + BlobItem blobItem2 = new BlobItem().setName("blobName2").setProperties(new BlobItemProperties().setContentLength(10L)); + SettableSupplier> supplier2 = new SettableSupplier<>(); + supplier2.set(new TestPagedResponse<>(ImmutableList.of(blobPrefixItem, blobItem2))); + PagedIterable pagedIterable2 = new PagedIterable<>(supplier2); + EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(CONTAINER, "dir2", MAX_LISTING_LENGTH, MAX_TRIES)) + .andReturn(pagedIterable2); replayAll(); - azureCloudBlobIterator = new AzureCloudBlobIterator( storage, - blobItemDruidFactory, config, - PREFIXES, + prefixes, MAX_LISTING_LENGTH ); - - List expectedBlobItems = ImmutableList.of( - cloudBlobDruidPrefixWithOnlyCloudBlobs1, - cloudBlobDruidPrefixWithOnlyCloudBlobs2, - cloudBlobDruidPrefixWithCloudBlobsAndDirectories - ); List actualBlobItems = new ArrayList<>(); while (azureCloudBlobIterator.hasNext()) { actualBlobItems.add(azureCloudBlobIterator.next()); } - Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size()); - Assert.assertTrue(expectedBlobItems.containsAll(actualBlobItems)); verifyAll(); + List expectedBlobItems = ImmutableList.of( + new CloudBlobHolder(blobItem, CONTAINER), + new CloudBlobHolder(blobItem2, CONTAINER) + ); + Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size()); + Assert.assertEquals( + expectedBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()), + actualBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet())); } @Test - public void test_next_emptyObjects_skipEmptyObjects() throws URISyntaxException, StorageException + public void test_next_emptyObjects_skipEmptyObjects() throws Exception { - EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); - EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.isCloudBlob()).andReturn(true); - EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.getCloudBlob()).andReturn( - cloudBlobDruidPrefixWithOnlyCloudBlobs1).anyTimes(); - EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs1)).andReturn( - cloudBlobItemPrefixWithOnlyCloudBlobs1); - - ListBlobItem emptyBlobItem = createMock(ListBlobItem.class); - ListBlobItemHolder emptyBlobItemHolder = createMock(ListBlobItemHolder.class); - CloudBlobHolder emptyBlobHolder = createMock(CloudBlobHolder.class); - EasyMock.expect(emptyBlobHolder.getBlobLength()).andReturn(0L).anyTimes(); - EasyMock.expect(emptyBlobItemHolder.isCloudBlob()).andReturn(true); - EasyMock.expect(emptyBlobItemHolder.getCloudBlob()).andReturn(emptyBlobHolder).anyTimes(); - - EasyMock.expect(blobItemDruidFactory.create(emptyBlobItem)).andReturn(emptyBlobItemHolder); + List prefixes = ImmutableList.of( + new URI(StringUtils.format("azure://%s/dir1", CONTAINER)) + ); - EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented( - CONTAINER1, - PREFIX_ONLY_CLOUD_BLOBS, - nullResultContinuationToken, - MAX_LISTING_LENGTH - )).andReturn(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1); + BlobItem blobItem = new BlobItem().setName("blobName").setProperties(new BlobItemProperties().setContentLength(10L)); + BlobItem blobItem2 = new BlobItem().setName("blobName2").setProperties(new BlobItemProperties().setContentLength(0L)); - EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getContinuationToken()) - .andReturn(nullResultContinuationToken); - ArrayList resultBlobItemsPrefixWithOnlyCloudBlobs1 = new ArrayList<>(); - resultBlobItemsPrefixWithOnlyCloudBlobs1.add(blobItemPrefixWithOnlyCloudBlobs1); - resultBlobItemsPrefixWithOnlyCloudBlobs1.add(emptyBlobItem); - EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getResults()) - .andReturn(resultBlobItemsPrefixWithOnlyCloudBlobs1); + SettableSupplier> supplier = new SettableSupplier<>(); + supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem, blobItem2))); + PagedIterable pagedIterable = new PagedIterable<>(supplier); + EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(CONTAINER, "dir1", MAX_LISTING_LENGTH, MAX_TRIES)) + .andReturn(pagedIterable); replayAll(); - azureCloudBlobIterator = new AzureCloudBlobIterator( storage, - blobItemDruidFactory, config, - ImmutableList.of(PREFIX_ONLY_CLOUD_BLOBS_URI), + prefixes, MAX_LISTING_LENGTH ); - - List expectedBlobItems = ImmutableList.of(cloudBlobDruidPrefixWithOnlyCloudBlobs1); - List actualBlobItems = Lists.newArrayList(azureCloudBlobIterator); - Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size()); - Assert.assertTrue(expectedBlobItems.containsAll(actualBlobItems)); + List actualBlobItems = new ArrayList<>(); + while (azureCloudBlobIterator.hasNext()) { + actualBlobItems.add(azureCloudBlobIterator.next()); + } verifyAll(); + List expectedBlobItems = ImmutableList.of( + new CloudBlobHolder(blobItem, CONTAINER) + ); + Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size()); + Assert.assertEquals( + expectedBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()), + actualBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet())); } @Test(expected = NoSuchElementException.class) @@ -333,9 +161,8 @@ public void test_next_emptyPrefixes_throwsNoSuchElementException() { azureCloudBlobIterator = new AzureCloudBlobIterator( storage, - blobItemDruidFactory, config, - EMPTY_URI_PREFIXES, + ImmutableList.of(), MAX_LISTING_LENGTH ); azureCloudBlobIterator.next(); @@ -344,50 +171,46 @@ public void test_next_emptyPrefixes_throwsNoSuchElementException() @Test(expected = RE.class) public void test_fetchNextBatch_moreThanMaxTriesRetryableExceptionsThrownInStorage_throwsREException() throws Exception { - EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); - EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented( - EasyMock.anyString(), - EasyMock.anyString(), - EasyMock.anyObject(), - EasyMock.anyInt() - )).andThrow(RETRYABLE_EXCEPTION); + List prefixes = ImmutableList.of( + new URI(StringUtils.format("azure://%s/dir1", CONTAINER)) + ); + EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented( EasyMock.anyString(), EasyMock.anyString(), - EasyMock.anyObject(), + EasyMock.anyInt(), EasyMock.anyInt() - )).andThrow(RETRYABLE_EXCEPTION); + )).andThrow(new BlobStorageException("", null, null)).times(3); + + replayAll(); azureCloudBlobIterator = new AzureCloudBlobIterator( storage, - blobItemDruidFactory, config, - PREFIXES, + prefixes, MAX_LISTING_LENGTH ); + verifyAll(); } @Test(expected = RE.class) public void test_fetchNextBatch_nonRetryableExceptionThrownInStorage_throwsREException() throws Exception { - EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); + List prefixes = ImmutableList.of( + new URI(StringUtils.format("azure://%s/dir1", CONTAINER)) + ); EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented( EasyMock.anyString(), EasyMock.anyString(), - EasyMock.anyObject(), + EasyMock.anyInt(), EasyMock.anyInt() - )).andThrow(NON_RETRYABLE_EXCEPTION); + )).andThrow(new RuntimeException("")); + replayAll(); azureCloudBlobIterator = new AzureCloudBlobIterator( storage, - blobItemDruidFactory, config, - PREFIXES, + prefixes, MAX_LISTING_LENGTH ); - } - - @After - public void cleanup() - { - resetAll(); + verifyAll(); } } diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java index e031015168d8..2c24ea23e980 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java @@ -19,10 +19,9 @@ package org.apache.druid.storage.azure; +import com.azure.storage.blob.models.BlobStorageException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.StorageExtendedErrorInformation; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; @@ -38,7 +37,6 @@ import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; @@ -48,18 +46,17 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport private static final String CONTAINER_NAME = "container"; private static final String CONTAINER = "test"; private static final String PREFIX = "test/log"; - private static final int MAX_TRIES = 3; private static final String BLOB_PATH = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip"; private static final int MAX_KEYS = 1; + private static final int MAX_TRIES = 3; + private static final long TIME_0 = 0L; private static final long TIME_1 = 1L; - private static final long TIME_NOW = 2L; - private static final long TIME_FUTURE = 3L; private static final String KEY_1 = "key1"; private static final String KEY_2 = "key2"; private static final URI PREFIX_URI = URI.create(StringUtils.format("azure://%s/%s", CONTAINER, PREFIX)); - private static final Exception RECOVERABLE_EXCEPTION = new StorageException("", "", null); - private static final Exception NON_RECOVERABLE_EXCEPTION = new URISyntaxException("", ""); + // BlobStorageException is not recoverable since the client attempts retries on it internally + private static final Exception NON_RECOVERABLE_EXCEPTION = new BlobStorageException("", null, null); private static final DataSegment DATA_SEGMENT = new DataSegment( "test", @@ -73,9 +70,6 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport 1 ); - private static final StorageExtendedErrorInformation NULL_STORAGE_EXTENDED_ERROR_INFORMATION = null; - private static final StorageExtendedErrorInformation STORAGE_EXTENDED_ERROR_INFORMATION = new StorageExtendedErrorInformation(); - private AzureDataSegmentConfig segmentConfig; private AzureInputDataConfig inputDataConfig; private AzureAccountConfig accountConfig; @@ -93,7 +87,7 @@ public void before() } @Test - public void killTest() throws SegmentLoadingException, URISyntaxException, StorageException + public void killTest() throws SegmentLoadingException, BlobStorageException { List deletedFiles = new ArrayList<>(); @@ -112,30 +106,29 @@ public void killTest() throws SegmentLoadingException, URISyntaxException, Stora @Test(expected = SegmentLoadingException.class) public void test_kill_StorageExceptionExtendedErrorInformationNull_throwsException() - throws SegmentLoadingException, URISyntaxException, StorageException + throws SegmentLoadingException, BlobStorageException { - common_test_kill_StorageExceptionExtendedError_throwsException(NULL_STORAGE_EXTENDED_ERROR_INFORMATION); + common_test_kill_StorageExceptionExtendedError_throwsException(); } @Test(expected = SegmentLoadingException.class) public void test_kill_StorageExceptionExtendedErrorInformationNotNull_throwsException() - throws SegmentLoadingException, URISyntaxException, StorageException + throws SegmentLoadingException, BlobStorageException { - common_test_kill_StorageExceptionExtendedError_throwsException(STORAGE_EXTENDED_ERROR_INFORMATION); + common_test_kill_StorageExceptionExtendedError_throwsException(); } - @Test(expected = SegmentLoadingException.class) - public void test_kill_URISyntaxException_throwsException() - throws SegmentLoadingException, URISyntaxException, StorageException + @Test(expected = RuntimeException.class) + public void test_kill_runtimeException_throwsException() + throws SegmentLoadingException, BlobStorageException { String dirPath = Paths.get(BLOB_PATH).getParent().toString(); EasyMock.expect(azureStorage.emptyCloudBlobDirectory(CONTAINER_NAME, dirPath)).andThrow( - new URISyntaxException( - "", + new RuntimeException( "" ) ); @@ -182,7 +175,7 @@ public void test_killAll_noException_deletesAllSegments() throws Exception EasyMock.expect(segmentConfig.getContainer()).andReturn(CONTAINER).atLeastOnce(); EasyMock.expect(segmentConfig.getPrefix()).andReturn(PREFIX).atLeastOnce(); EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes(); CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); CloudBlobHolder object2 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_2, TIME_1); @@ -197,7 +190,9 @@ public void test_killAll_noException_deletesAllSegments() throws Exception AzureTestUtils.expectDeleteObjects( azureStorage, ImmutableList.of(object1, object2), - ImmutableMap.of()); + ImmutableMap.of(), + MAX_TRIES + ); EasyMock.replay(segmentConfig, inputDataConfig, accountConfig, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory); killer.killAll(); @@ -205,34 +200,7 @@ public void test_killAll_noException_deletesAllSegments() throws Exception } @Test - public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllSegments() throws Exception - { - EasyMock.expect(segmentConfig.getContainer()).andReturn(CONTAINER).atLeastOnce(); - EasyMock.expect(segmentConfig.getPrefix()).andReturn(PREFIX).atLeastOnce(); - EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); - - CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); - - AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects( - azureCloudBlobIterableFactory, - MAX_KEYS, - PREFIX_URI, - ImmutableList.of(object1)); - - EasyMock.replay(object1); - AzureTestUtils.expectDeleteObjects( - azureStorage, - ImmutableList.of(object1), - ImmutableMap.of(object1, RECOVERABLE_EXCEPTION)); - EasyMock.replay(segmentConfig, inputDataConfig, accountConfig, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); - AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory); - killer.killAll(); - EasyMock.verify(segmentConfig, inputDataConfig, accountConfig, object1, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); - } - - @Test - public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSegments() throws Exception + public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSegments() { boolean ioExceptionThrown = false; CloudBlobHolder object1 = null; @@ -241,7 +209,7 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSeg EasyMock.expect(segmentConfig.getContainer()).andReturn(CONTAINER).atLeastOnce(); EasyMock.expect(segmentConfig.getPrefix()).andReturn(PREFIX).atLeastOnce(); EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes(); object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); @@ -256,7 +224,8 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSeg AzureTestUtils.expectDeleteObjects( azureStorage, ImmutableList.of(), - ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION) + ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION), + MAX_TRIES ); EasyMock.replay( segmentConfig, @@ -292,17 +261,15 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSeg ); } - private void common_test_kill_StorageExceptionExtendedError_throwsException(StorageExtendedErrorInformation storageExtendedErrorInformation) - throws SegmentLoadingException, URISyntaxException, StorageException + private void common_test_kill_StorageExceptionExtendedError_throwsException() + throws SegmentLoadingException, BlobStorageException { String dirPath = Paths.get(BLOB_PATH).getParent().toString(); EasyMock.expect(azureStorage.emptyCloudBlobDirectory(CONTAINER_NAME, dirPath)).andThrow( - new StorageException( - "", + new BlobStorageException( "", - 400, - storageExtendedErrorInformation, + null, null ) ); diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java index 13820072cb7b..b3b67c4f13b4 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java @@ -19,7 +19,8 @@ package org.apache.druid.storage.azure; -import com.microsoft.azure.storage.StorageException; +import com.azure.core.http.HttpResponse; +import com.azure.storage.blob.models.BlobStorageException; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.segment.loading.SegmentLoadingException; import org.easymock.EasyMock; @@ -32,7 +33,6 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.net.URISyntaxException; public class AzureDataSegmentPullerTest extends EasyMockSupport { @@ -53,7 +53,7 @@ public void before() @Test public void test_getSegmentFiles_success() - throws SegmentLoadingException, URISyntaxException, StorageException, IOException + throws SegmentLoadingException, BlobStorageException, IOException { final String value = "bucket"; final File pulledFile = AzureTestUtils.createZipTempFile(SEGMENT_FILE_NAME, value); @@ -85,7 +85,7 @@ public void test_getSegmentFiles_success() @Test public void test_getSegmentFiles_blobPathIsHadoop_success() - throws SegmentLoadingException, URISyntaxException, StorageException, IOException + throws SegmentLoadingException, BlobStorageException, IOException { final String value = "bucket"; final File pulledFile = AzureTestUtils.createZipTempFile(SEGMENT_FILE_NAME, value); @@ -117,17 +117,15 @@ public void test_getSegmentFiles_blobPathIsHadoop_success() @Test(expected = RuntimeException.class) public void test_getSegmentFiles_nonRecoverableErrorRaisedWhenPullingSegmentFiles_doNotDeleteOutputDirectory() - throws IOException, URISyntaxException, StorageException, SegmentLoadingException + throws IOException, BlobStorageException, SegmentLoadingException { final File outDir = FileUtils.createTempDir(); try { EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, BLOB_PATH)); EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andThrow( - new URISyntaxException( - "error", - "error", - 404 + new RuntimeException( + "error" ) ); @@ -149,17 +147,21 @@ public void test_getSegmentFiles_nonRecoverableErrorRaisedWhenPullingSegmentFile @Test(expected = SegmentLoadingException.class) public void test_getSegmentFiles_recoverableErrorRaisedWhenPullingSegmentFiles_deleteOutputDirectory() - throws IOException, URISyntaxException, StorageException, SegmentLoadingException + throws IOException, BlobStorageException, SegmentLoadingException { final File outDir = FileUtils.createTempDir(); try { + HttpResponse httpResponse = createMock(HttpResponse.class); + EasyMock.expect(httpResponse.getStatusCode()).andReturn(500).anyTimes(); + EasyMock.replay(httpResponse); EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, BLOB_PATH)); EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andThrow( - new StorageException(null, null, 0, null, null) + new BlobStorageException("", httpResponse, null) ).atLeastOnce(); - replayAll(); + EasyMock.replay(azureStorage); + EasyMock.replay(byteSourceFactory); AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory); diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java index b18fabbc3dae..2c8e357b2c5a 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java @@ -19,9 +19,9 @@ package org.apache.druid.storage.azure; +import com.azure.storage.blob.models.BlobStorageException; import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; -import com.microsoft.azure.storage.StorageException; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.java.util.common.StringUtils; @@ -37,7 +37,6 @@ import java.io.File; import java.io.IOException; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -69,6 +68,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport private static final String UNIQUE_MATCHER_PREFIX = PREFIX + "/" + UNIQUE_MATCHER_NO_PREFIX; private static final String NON_UNIQUE_NO_PREFIX_MATCHER = "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/index\\.zip"; private static final String NON_UNIQUE_WITH_PREFIX_MATCHER = PREFIX + "/" + "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/index\\.zip"; + private static final int MAX_TRIES = 3; private static final DataSegment SEGMENT_TO_PUSH = new DataSegment( "foo", @@ -92,6 +92,7 @@ public void before() { azureStorage = createMock(AzureStorage.class); azureAccountConfig = new AzureAccountConfig(); + azureAccountConfig.setMaxTries(MAX_TRIES); azureAccountConfig.setAccount(ACCOUNT); segmentConfigWithPrefix = new AzureDataSegmentConfig(); @@ -115,7 +116,7 @@ public void test_push_nonUniquePathNoPrefix_succeeds() throws Exception Files.write(DATA, tmp); String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath); - azureStorage.uploadBlockBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath)); + azureStorage.uploadBlockBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath), EasyMock.eq(MAX_TRIES)); EasyMock.expectLastCall(); replayAll(); @@ -148,7 +149,8 @@ public void test_push_nonUniquePathWithPrefix_succeeds() throws Exception azureStorage.uploadBlockBlob( EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), - EasyMock.eq(PREFIX + "/" + azurePath) + EasyMock.eq(PREFIX + "/" + azurePath), + EasyMock.eq(MAX_TRIES) ); EasyMock.expectLastCall(); @@ -181,7 +183,8 @@ public void test_push_uniquePathNoPrefix_succeeds() throws Exception azureStorage.uploadBlockBlob( EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), - EasyMock.matches(UNIQUE_MATCHER_NO_PREFIX) + EasyMock.matches(UNIQUE_MATCHER_NO_PREFIX), + EasyMock.eq(MAX_TRIES) ); EasyMock.expectLastCall(); @@ -214,7 +217,8 @@ public void test_push_uniquePath_succeeds() throws Exception azureStorage.uploadBlockBlob( EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), - EasyMock.matches(UNIQUE_MATCHER_PREFIX) + EasyMock.matches(UNIQUE_MATCHER_PREFIX), + EasyMock.eq(MAX_TRIES) ); EasyMock.expectLastCall(); @@ -245,8 +249,8 @@ public void test_push_exception_throwsException() throws Exception final long size = DATA.length; String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath); - azureStorage.uploadBlockBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath)); - EasyMock.expectLastCall().andThrow(new URISyntaxException("", "")); + azureStorage.uploadBlockBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.anyString(), EasyMock.eq(MAX_TRIES)); + EasyMock.expectLastCall().andThrow(new BlobStorageException("", null, null)); replayAll(); @@ -277,14 +281,14 @@ public void getAzurePathsTest() } @Test - public void uploadDataSegmentTest() throws StorageException, IOException, URISyntaxException + public void uploadDataSegmentTest() throws BlobStorageException, IOException { AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix); final int binaryVersion = 9; final File compressedSegmentData = new File("index.zip"); final String azurePath = pusher.getAzurePath(DATA_SEGMENT, false); - azureStorage.uploadBlockBlob(compressedSegmentData, CONTAINER_NAME, azurePath); + azureStorage.uploadBlockBlob(compressedSegmentData, CONTAINER_NAME, azurePath, MAX_TRIES); EasyMock.expectLastCall(); replayAll(); diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java index 3c5fada7c10e..6d5efbae501a 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java @@ -19,6 +19,7 @@ package org.apache.druid.storage.azure; +import com.azure.storage.blob.BlobServiceClient; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; @@ -28,9 +29,6 @@ import com.google.inject.Module; import com.google.inject.ProvisionException; import com.google.inject.TypeLiteral; -import com.microsoft.azure.storage.StorageCredentials; -import com.microsoft.azure.storage.blob.CloudBlobClient; -import com.microsoft.azure.storage.blob.ListBlobItem; import org.apache.druid.data.input.azure.AzureEntityFactory; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.guice.DruidGuiceExtensions; @@ -38,8 +36,6 @@ import org.apache.druid.guice.LazySingleton; import org.apache.druid.jackson.JacksonModule; import org.apache.druid.segment.loading.OmniDataSegmentKiller; -import org.apache.druid.storage.azure.blob.ListBlobItemHolder; -import org.apache.druid.storage.azure.blob.ListBlobItemHolderFactory; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.junit.Assert; @@ -63,6 +59,7 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport private static final String AZURE_ACCOUNT_NAME; private static final String AZURE_ACCOUNT_KEY; private static final String AZURE_SHARED_ACCESS_TOKEN; + private static final String AZURE_MANAGED_CREDENTIAL_CLIENT_ID; private static final String AZURE_CONTAINER; private static final String AZURE_PREFIX; private static final int AZURE_MAX_LISTING_LENGTH; @@ -72,8 +69,6 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport private CloudObjectLocation cloudObjectLocation1; private CloudObjectLocation cloudObjectLocation2; - private ListBlobItem blobItem1; - private ListBlobItem blobItem2; private Injector injector; static { @@ -82,6 +77,7 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport AZURE_ACCOUNT_KEY = Base64.getUrlEncoder() .encodeToString("azureKey1".getBytes(StandardCharsets.UTF_8)); AZURE_SHARED_ACCESS_TOKEN = "dummyToken"; + AZURE_MANAGED_CREDENTIAL_CLIENT_ID = "clientId"; AZURE_CONTAINER = "azureContainer1"; AZURE_PREFIX = "azurePrefix1"; AZURE_MAX_LISTING_LENGTH = 10; @@ -97,8 +93,6 @@ public void setup() { cloudObjectLocation1 = createMock(CloudObjectLocation.class); cloudObjectLocation2 = createMock(CloudObjectLocation.class); - blobItem1 = createMock(ListBlobItem.class); - blobItem2 = createMock(ListBlobItem.class); } @Test @@ -144,55 +138,6 @@ public void testGetAzureInputDataConfigExpectedConfig() Assert.assertEquals(AZURE_MAX_LISTING_LENGTH, inputDataConfig.getMaxListingLength()); } - @Test - public void testGetBlobClientExpectedClient() - { - injector = makeInjectorWithProperties(PROPERTIES); - - Supplier cloudBlobClient = injector.getInstance( - Key.get(new TypeLiteral>(){}) - ); - StorageCredentials storageCredentials = cloudBlobClient.get().getCredentials(); - - Assert.assertEquals(AZURE_ACCOUNT_NAME, storageCredentials.getAccountName()); - } - - @Test - public void testGetAzureStorageContainerExpectedClient() - { - injector = makeInjectorWithProperties(PROPERTIES); - - Supplier cloudBlobClient = injector.getInstance( - Key.get(new TypeLiteral>(){}) - ); - StorageCredentials storageCredentials = cloudBlobClient.get().getCredentials(); - - Assert.assertEquals(AZURE_ACCOUNT_NAME, storageCredentials.getAccountName()); - - AzureStorage azureStorage = injector.getInstance(AzureStorage.class); - Assert.assertSame(cloudBlobClient.get(), azureStorage.getCloudBlobClient()); - } - - @Test - public void testGetAzureStorageContainerWithSASExpectedClient() - { - Properties properties = initializePropertes(); - properties.setProperty("druid.azure.sharedAccessStorageToken", AZURE_SHARED_ACCESS_TOKEN); - properties.remove("druid.azure.key"); - - injector = makeInjectorWithProperties(properties); - - Supplier cloudBlobClient = injector.getInstance( - Key.get(new TypeLiteral>(){}) - ); - - AzureAccountConfig azureAccountConfig = injector.getInstance(AzureAccountConfig.class); - Assert.assertEquals(AZURE_SHARED_ACCESS_TOKEN, azureAccountConfig.getSharedAccessStorageToken()); - - AzureStorage azureStorage = injector.getInstance(AzureStorage.class); - Assert.assertSame(cloudBlobClient.get(), azureStorage.getCloudBlobClient()); - } - @Test public void testGetAzureByteSourceFactoryCanCreateAzureByteSource() { @@ -247,18 +192,6 @@ public void testGetAzureCloudBlobIterableFactoryCanCreateAzureCloudBlobIterable( Assert.assertNotSame(object1, object2); } - @Test - public void testGetListBlobItemDruidFactoryCanCreateListBlobItemDruid() - { - injector = makeInjectorWithProperties(PROPERTIES); - ListBlobItemHolderFactory factory = injector.getInstance(ListBlobItemHolderFactory.class); - ListBlobItemHolder object1 = factory.create(blobItem1); - ListBlobItemHolder object2 = factory.create(blobItem2); - Assert.assertNotNull(object1); - Assert.assertNotNull(object2); - Assert.assertNotSame(object1, object2); - } - @Test public void testSegmentKillerBoundSingleton() { @@ -276,28 +209,51 @@ public void testSegmentKillerBoundSingleton() } @Test - public void testBothAccountKeyAndSAStokenSet() + public void testMultipleCredentialsSet() { + String message = "Set only one of 'key' or 'sharedAccessStorageToken' or 'useAzureCredentialsChain' in the azure config."; Properties properties = initializePropertes(); properties.setProperty("druid.azure.sharedAccessStorageToken", AZURE_SHARED_ACCESS_TOKEN); expectedException.expect(ProvisionException.class); - expectedException.expectMessage("Either set 'key' or 'sharedAccessStorageToken' in the azure config but not both"); + expectedException.expectMessage(message); + makeInjectorWithProperties(properties).getInstance( + Key.get(new TypeLiteral() + { + }) + ); + + properties = initializePropertes(); + properties.setProperty("druid.azure.managedIdentityClientId", AZURE_MANAGED_CREDENTIAL_CLIENT_ID); + expectedException.expect(ProvisionException.class); + expectedException.expectMessage(message); + makeInjectorWithProperties(properties).getInstance( + Key.get(new TypeLiteral>() + { + }) + ); + + properties = initializePropertes(); + properties.remove("druid.azure.key"); + properties.setProperty("druid.azure.managedIdentityClientId", AZURE_MANAGED_CREDENTIAL_CLIENT_ID); + properties.setProperty("druid.azure.sharedAccessStorageToken", AZURE_SHARED_ACCESS_TOKEN); + expectedException.expect(ProvisionException.class); + expectedException.expectMessage(message); makeInjectorWithProperties(properties).getInstance( - Key.get(new TypeLiteral>() + Key.get(new TypeLiteral() { }) ); } @Test - public void testBothAccountKeyAndSAStokenUnset() + public void testAllCredentialsUnset() { Properties properties = initializePropertes(); properties.remove("druid.azure.key"); expectedException.expect(ProvisionException.class); - expectedException.expectMessage("Either set 'key' or 'sharedAccessStorageToken' in the azure config but not both"); + expectedException.expectMessage("Either set 'key' or 'sharedAccessStorageToken' or 'useAzureCredentialsChain' in the azure config."); makeInjectorWithProperties(properties).getInstance( - Key.get(new TypeLiteral>() + Key.get(new TypeLiteral() { }) ); diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java index 9ae08546401a..5b980915ea93 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java @@ -19,53 +19,73 @@ package org.apache.druid.storage.azure; +import com.azure.core.http.rest.PagedIterable; +import com.azure.core.http.rest.PagedResponse; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.blob.models.BlobItemProperties; +import com.azure.storage.blob.models.BlobStorageException; import com.google.common.collect.ImmutableList; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.CloudBlobClient; -import com.microsoft.azure.storage.blob.CloudBlobContainer; -import com.microsoft.azure.storage.blob.CloudBlockBlob; -import com.microsoft.azure.storage.blob.ListBlobItem; +import org.apache.druid.common.guava.SettableSupplier; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.List; +// Using Mockito for the whole test class since azure classes (e.g. BlobContainerClient) are final and can't be mocked with EasyMock public class AzureStorageTest { AzureStorage azureStorage; - CloudBlobClient cloudBlobClient = Mockito.mock(CloudBlobClient.class); - CloudBlobContainer cloudBlobContainer = Mockito.mock(CloudBlobContainer.class); + BlobServiceClient blobServiceClient = Mockito.mock(BlobServiceClient.class); + BlobContainerClient blobContainerClient = Mockito.mock(BlobContainerClient.class); + AzureClientFactory azureClientFactory = Mockito.mock(AzureClientFactory.class); + + private final String CONTAINER = "container"; + private final String BLOB_NAME = "blobName"; + private final Integer MAX_ATTEMPTS = 3; @Before - public void setup() throws URISyntaxException, StorageException + public void setup() throws BlobStorageException { - Mockito.doReturn(cloudBlobContainer).when(cloudBlobClient).getContainerReference(ArgumentMatchers.anyString()); - azureStorage = new AzureStorage(() -> cloudBlobClient); + azureStorage = new AzureStorage(azureClientFactory); } @Test - public void testListDir() throws URISyntaxException, StorageException + public void testListDir_retriable() throws BlobStorageException { - List listBlobItems = ImmutableList.of( - new CloudBlockBlob(new URI("azure://dummy.com/container/blobName")) + BlobItem blobItem = new BlobItem().setName(BLOB_NAME).setProperties(new BlobItemProperties().setContentLength(10L)); + SettableSupplier> supplier = new SettableSupplier<>(); + supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem))); + PagedIterable pagedIterable = new PagedIterable<>(supplier); + Mockito.doReturn(pagedIterable).when(blobContainerClient).listBlobs( + ArgumentMatchers.any(), + ArgumentMatchers.any() ); + Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); + Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(MAX_ATTEMPTS); - Mockito.doReturn(listBlobItems).when(cloudBlobContainer).listBlobs( - ArgumentMatchers.anyString(), - ArgumentMatchers.anyBoolean(), - ArgumentMatchers.any(), + Assert.assertEquals(ImmutableList.of(BLOB_NAME), azureStorage.listDir(CONTAINER, "", MAX_ATTEMPTS)); + } + + @Test + public void testListDir_nullMaxAttempts() throws BlobStorageException + { + BlobItem blobItem = new BlobItem().setName(BLOB_NAME).setProperties(new BlobItemProperties().setContentLength(10L)); + SettableSupplier> supplier = new SettableSupplier<>(); + supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem))); + PagedIterable pagedIterable = new PagedIterable<>(supplier); + Mockito.doReturn(pagedIterable).when(blobContainerClient).listBlobs( ArgumentMatchers.any(), ArgumentMatchers.any() - ); - Assert.assertEquals(ImmutableList.of("blobName"), azureStorage.listDir("test", "")); + Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); + Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null); + Assert.assertEquals(ImmutableList.of(BLOB_NAME), azureStorage.listDir(CONTAINER, "", null)); } } diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java index 2575793176e3..3ccb5fd448eb 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java @@ -19,10 +19,10 @@ package org.apache.druid.storage.azure; +import com.azure.storage.blob.models.BlobStorageException; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.microsoft.azure.storage.StorageException; import org.apache.commons.io.IOUtils; import org.apache.druid.common.utils.CurrentTimeMillisSupplier; import org.apache.druid.java.util.common.FileUtils; @@ -41,7 +41,6 @@ import java.io.InputStream; import java.io.StringWriter; import java.net.URI; -import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; public class AzureTaskLogsTest extends EasyMockSupport @@ -51,8 +50,7 @@ public class AzureTaskLogsTest extends EasyMockSupport private static final String PREFIX = "test/log"; private static final String TASK_ID = "taskid"; private static final String TASK_ID_NOT_FOUND = "taskidNotFound"; - private static final int MAX_TRIES = 3; - private static final AzureTaskLogsConfig AZURE_TASK_LOGS_CONFIG = new AzureTaskLogsConfig(CONTAINER, PREFIX, MAX_TRIES); + private static final AzureTaskLogsConfig AZURE_TASK_LOGS_CONFIG = new AzureTaskLogsConfig(CONTAINER, PREFIX); private static final int MAX_KEYS = 1; private static final long TIME_0 = 0L; private static final long TIME_1 = 1L; @@ -61,8 +59,9 @@ public class AzureTaskLogsTest extends EasyMockSupport private static final String KEY_1 = "key1"; private static final String KEY_2 = "key2"; private static final URI PREFIX_URI = URI.create(StringUtils.format("azure://%s/%s", CONTAINER, PREFIX)); - private static final Exception RECOVERABLE_EXCEPTION = new StorageException("", "", null); - private static final Exception NON_RECOVERABLE_EXCEPTION = new URISyntaxException("", ""); + private static final int MAX_TRIES = 3; + + private static final Exception NON_RECOVERABLE_EXCEPTION = new BlobStorageException("", null, null); private AzureInputDataConfig inputDataConfig; private AzureAccountConfig accountConfig; @@ -97,9 +96,11 @@ public void test_PushTaskLog_uploadsBlob() throws Exception try { final File logFile = new File(tmpDir, "log"); - azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/log"); + azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/log", MAX_TRIES); EasyMock.expectLastCall(); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes(); + replayAll(); azureTaskLogs.pushTaskLog(TASK_ID, logFile); @@ -119,7 +120,8 @@ public void test_PushTaskLog_exception_rethrowsException() throws Exception try { final File logFile = new File(tmpDir, "log"); - azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/log"); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes(); + azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/log", MAX_TRIES); EasyMock.expectLastCall().andThrow(new IOException()); replayAll(); @@ -141,7 +143,8 @@ public void test_PushTaskReports_uploadsBlob() throws Exception try { final File logFile = new File(tmpDir, "log"); - azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/report.json"); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes(); + azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/report.json", MAX_TRIES); EasyMock.expectLastCall(); replayAll(); @@ -163,7 +166,8 @@ public void test_PushTaskStatus_uploadsBlob() throws Exception try { final File logFile = new File(tmpDir, "status.json"); - azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/status.json"); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes(); + azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/status.json", MAX_TRIES); EasyMock.expectLastCall(); replayAll(); @@ -185,7 +189,8 @@ public void test_PushTaskReports_exception_rethrowsException() throws Exception try { final File logFile = new File(tmpDir, "log"); - azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/report.json"); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes(); + azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/report.json", MAX_TRIES); EasyMock.expectLastCall().andThrow(new IOException()); replayAll(); @@ -318,7 +323,7 @@ public void test_streamTaskReports_exceptionWhenGettingStream_throwsException() EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andReturn(true); EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, blobPath)).andReturn((long) testLog.length()); EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, blobPath)).andThrow( - new URISyntaxException("", "")); + new BlobStorageException("", null, null)); replayAll(); @@ -333,10 +338,9 @@ public void test_streamTaskReports_exceptionWhenGettingStream_throwsException() @Test(expected = IOException.class) public void test_streamTaskReports_exceptionWhenCheckingBlobExistence_throwsException() throws Exception { - final String testLog = "hello this is a log"; final String blobPath = PREFIX + "/" + TASK_ID + "/report.json"; - EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andThrow(new URISyntaxException("", "")); + EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andThrow(new BlobStorageException("", null, null)); replayAll(); @@ -393,7 +397,7 @@ public void test_streamTaskStatus_exceptionWhenGettingStream_throwsException() t EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andReturn(true); EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, blobPath)).andReturn((long) taskStatus.length()); EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, blobPath)).andThrow( - new URISyntaxException("", "")); + new BlobStorageException("", null, null)); replayAll(); @@ -409,7 +413,7 @@ public void test_streamTaskStatus_exceptionWhenGettingStream_throwsException() t public void test_streamTaskStatus_exceptionWhenCheckingBlobExistence_throwsException() throws Exception { final String blobPath = PREFIX + "/" + TASK_ID + "/status.json"; - EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andThrow(new URISyntaxException("", "")); + EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andThrow(new BlobStorageException("", null, null)); replayAll(); @@ -422,8 +426,8 @@ public void test_streamTaskStatus_exceptionWhenCheckingBlobExistence_throwsExcep public void test_killAll_noException_deletesAllTaskLogs() throws Exception { EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes(); CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); CloudBlobHolder object2 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_2, TIME_1); @@ -438,47 +442,24 @@ public void test_killAll_noException_deletesAllTaskLogs() throws Exception AzureTestUtils.expectDeleteObjects( azureStorage, ImmutableList.of(object1, object2), - ImmutableMap.of()); + ImmutableMap.of(), + MAX_TRIES + ); EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); azureTaskLogs.killAll(); EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, object2, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); } @Test - public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws Exception - { - EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); - EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); - - CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); - - AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects( - azureCloudBlobIterableFactory, - MAX_KEYS, - PREFIX_URI, - ImmutableList.of(object1)); - - EasyMock.replay(object1); - AzureTestUtils.expectDeleteObjects( - azureStorage, - ImmutableList.of(object1), - ImmutableMap.of(object1, RECOVERABLE_EXCEPTION)); - EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); - azureTaskLogs.killAll(); - EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); - } - - @Test - public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() throws Exception + public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() { boolean ioExceptionThrown = false; CloudBlobHolder object1 = null; AzureCloudBlobIterable azureCloudBlobIterable = null; try { EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes(); object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); @@ -493,7 +474,8 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteA AzureTestUtils.expectDeleteObjects( azureStorage, ImmutableList.of(), - ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION) + ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION), + MAX_TRIES ); EasyMock.replay( inputDataConfig, @@ -524,7 +506,7 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteA public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws Exception { EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes(); CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); CloudBlobHolder object2 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_2, TIME_FUTURE); @@ -539,45 +521,23 @@ public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws AzureTestUtils.expectDeleteObjects( azureStorage, ImmutableList.of(object1), - ImmutableMap.of()); + ImmutableMap.of(), + MAX_TRIES + ); EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); azureTaskLogs.killOlderThan(TIME_NOW); EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, object2, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); } @Test - public void test_killOlderThan_recoverableExceptionWhenDeletingObjects_deletesOnlyTaskLogsOlderThan() throws Exception - { - EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); - - CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); - - AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects( - azureCloudBlobIterableFactory, - MAX_KEYS, - PREFIX_URI, - ImmutableList.of(object1)); - - EasyMock.replay(object1); - AzureTestUtils.expectDeleteObjects( - azureStorage, - ImmutableList.of(object1), - ImmutableMap.of(object1, RECOVERABLE_EXCEPTION)); - EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); - azureTaskLogs.killOlderThan(TIME_NOW); - EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage); - } - - @Test - public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() throws Exception + public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() { boolean ioExceptionThrown = false; CloudBlobHolder object1 = null; AzureCloudBlobIterable azureCloudBlobIterable = null; try { EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce(); + EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes(); object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0); @@ -592,7 +552,8 @@ public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntD AzureTestUtils.expectDeleteObjects( azureStorage, ImmutableList.of(), - ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION) + ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION), + MAX_TRIES ); EasyMock.replay( inputDataConfig, diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java index efd1b9290b8e..67ec5b8e58d6 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java @@ -68,7 +68,9 @@ public static AzureCloudBlobIterable expectListObjects( public static void expectDeleteObjects( AzureStorage storage, List deleteRequestsExpected, - Map deleteRequestToException) throws Exception + Map deleteRequestToException, + Integer maxTries + ) { Map> requestToResultExpectationSetter = new HashMap<>(); @@ -77,7 +79,7 @@ public static void expectDeleteObjects( Exception exception = requestsAndErrors.getValue(); IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject); if (resultExpectationSetter == null) { - storage.emptyCloudBlobDirectory(deleteObject.getContainerName(), deleteObject.getName()); + storage.emptyCloudBlobDirectory(deleteObject.getContainerName(), deleteObject.getName(), maxTries); resultExpectationSetter = EasyMock.expectLastCall().andThrow(exception); requestToResultExpectationSetter.put(deleteObject, resultExpectationSetter); } else { @@ -88,7 +90,7 @@ public static void expectDeleteObjects( for (CloudBlobHolder deleteObject : deleteRequestsExpected) { IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject); if (resultExpectationSetter == null) { - storage.emptyCloudBlobDirectory(deleteObject.getContainerName(), deleteObject.getName()); + storage.emptyCloudBlobDirectory(deleteObject.getContainerName(), deleteObject.getName(), maxTries); resultExpectationSetter = EasyMock.expectLastCall(); requestToResultExpectationSetter.put(deleteObject, resultExpectationSetter); } @@ -99,7 +101,7 @@ public static void expectDeleteObjects( public static CloudBlobHolder newCloudBlobHolder( String container, String prefix, - long lastModifiedTimestamp) throws Exception + long lastModifiedTimestamp) { CloudBlobHolder object = EasyMock.createMock(CloudBlobHolder.class); EasyMock.expect(object.getContainerName()).andReturn(container).anyTimes(); diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureUtilsTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureUtilsTest.java index f75370703ce9..0222100c43ed 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureUtilsTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureUtilsTest.java @@ -19,16 +19,24 @@ package org.apache.druid.storage.azure; -import com.microsoft.azure.storage.StorageException; +import com.azure.core.http.HttpResponse; +import com.azure.storage.blob.models.BlobStorageException; import org.apache.druid.data.input.azure.AzureInputSource; +import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; +import org.easymock.Mock; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.concurrent.TimeoutException; -public class AzureUtilsTest +@RunWith(EasyMockRunner.class) +public class AzureUtilsTest extends EasyMockSupport { private static final String CONTAINER_NAME = "container1"; private static final String BLOB_NAME = "blob1"; @@ -39,7 +47,7 @@ public class AzureUtilsTest private static final URI URI_WITH_PATH_WITH_LEADING_SLASH; private static final URISyntaxException URI_SYNTAX_EXCEPTION = new URISyntaxException("", ""); - private static final StorageException STORAGE_EXCEPTION = new StorageException("", "", null); + private static final IOException IO_EXCEPTION = new IOException(); private static final RuntimeException RUNTIME_EXCEPTION = new RuntimeException(); private static final RuntimeException NULL_EXCEPTION_WRAPPED_IN_RUNTIME_EXCEPTION = new RuntimeException("", null); @@ -64,6 +72,9 @@ public class AzureUtilsTest } } + @Mock + private HttpResponse httpResponse; + @Test public void test_extractAzureKey_pathHasLeadingSlash_returnsPathWithLeadingSlashRemoved() { @@ -93,9 +104,64 @@ public void test_azureRetry_URISyntaxException_returnsFalse() } @Test - public void test_azureRetry_StorageException_returnsTrue() + public void test_azureRetry_StorageException_500ErrorCode_returnsTrue() + { + EasyMock.expect(httpResponse.getStatusCode()).andReturn(500).anyTimes(); + + replayAll(); + BlobStorageException blobStorageException = new BlobStorageException("storage exception", httpResponse, null); + boolean retry = AzureUtils.AZURE_RETRY.apply(blobStorageException); + verifyAll(); + Assert.assertTrue(retry); + } + + @Test + public void test_azureRetry_StorageException_429ErrorCode_returnsTrue() + { + EasyMock.expect(httpResponse.getStatusCode()).andReturn(429).anyTimes(); + + replayAll(); + BlobStorageException blobStorageException = new BlobStorageException("storage exception", httpResponse, null); + boolean retry = AzureUtils.AZURE_RETRY.apply(blobStorageException); + verifyAll(); + Assert.assertTrue(retry); + } + + @Test + public void test_azureRetry_StorageException_503ErrorCode_returnsTrue() + { + EasyMock.expect(httpResponse.getStatusCode()).andReturn(503).anyTimes(); + + replayAll(); + BlobStorageException blobStorageException = new BlobStorageException("storage exception", httpResponse, null); + boolean retry = AzureUtils.AZURE_RETRY.apply(blobStorageException); + verifyAll(); + Assert.assertTrue(retry); + } + + @Test + public void test_azureRetry_StorageException_400ErrorCode_returnsFalse() + { + EasyMock.expect(httpResponse.getStatusCode()).andReturn(400).anyTimes(); + + replayAll(); + BlobStorageException blobStorageException = new BlobStorageException("storage exception", httpResponse, null); + boolean retry = AzureUtils.AZURE_RETRY.apply(blobStorageException); + verifyAll(); + Assert.assertFalse(retry); + } + + @Test + public void test_azureRetry_nestedIOException_returnsTrue() + { + boolean retry = AzureUtils.AZURE_RETRY.apply(new RuntimeException("runtime", new IOException("ioexception"))); + Assert.assertTrue(retry); + } + + @Test + public void test_azureRetry_nestedTimeoutException_returnsTrue() { - boolean retry = AzureUtils.AZURE_RETRY.apply(STORAGE_EXCEPTION); + boolean retry = AzureUtils.AZURE_RETRY.apply(new RuntimeException("runtime", new TimeoutException("timeout exception"))); Assert.assertTrue(retry); } diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/TestPagedResponse.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/TestPagedResponse.java new file mode 100644 index 000000000000..0385806915cc --- /dev/null +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/TestPagedResponse.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.azure; + +import com.azure.core.http.HttpHeaders; +import com.azure.core.http.HttpRequest; +import com.azure.core.http.rest.PagedResponse; +import com.azure.core.util.IterableStream; + +import java.util.Collection; + +public class TestPagedResponse implements PagedResponse +{ + private final Collection responseItems; + + public TestPagedResponse(Collection responseItems) + { + this.responseItems = responseItems; + } + + @Override + public int getStatusCode() + { + return 0; + } + + @Override + public HttpHeaders getHeaders() + { + return null; + } + + @Override + public HttpRequest getRequest() + { + return null; + } + + @Override + public IterableStream getElements() + { + return IterableStream.of(responseItems); + } + + @Override + public String getContinuationToken() + { + return null; + } + + @Override + public void close() + { + + } +} diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java index f8592c32eaf8..1bdbe0a2ece0 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java @@ -19,9 +19,9 @@ package org.apache.druid.storage.azure.output; +import com.azure.storage.blob.models.BlobStorageException; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.microsoft.azure.storage.StorageException; import org.apache.commons.io.IOUtils; import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.azure.AzureStorage; @@ -35,7 +35,6 @@ import java.io.IOException; import java.io.InputStream; -import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; @@ -64,7 +63,7 @@ public void setup() throws IOException @Test - public void testPathExistsSuccess() throws URISyntaxException, StorageException, IOException + public void testPathExistsSuccess() throws BlobStorageException, IOException { final Capture bucket = Capture.newInstance(); final Capture path = Capture.newInstance(); @@ -79,7 +78,7 @@ public void testPathExistsSuccess() throws URISyntaxException, StorageException, } @Test - public void testPathExistsNotFound() throws URISyntaxException, StorageException, IOException + public void testPathExistsNotFound() throws BlobStorageException, IOException { final Capture bucket = Capture.newInstance(); final Capture path = Capture.newInstance(); @@ -94,7 +93,7 @@ public void testPathExistsNotFound() throws URISyntaxException, StorageException } @Test - public void testRead() throws URISyntaxException, StorageException, IOException + public void testRead() throws BlobStorageException, IOException { EasyMock.reset(azureStorage); @@ -122,7 +121,7 @@ public void testRead() throws URISyntaxException, StorageException, IOException } @Test - public void testReadRange() throws URISyntaxException, StorageException, IOException + public void testReadRange() throws BlobStorageException, IOException { String data = "test"; @@ -151,7 +150,7 @@ public void testReadRange() throws URISyntaxException, StorageException, IOExcep } @Test - public void testDeleteSinglePath() throws URISyntaxException, StorageException, IOException + public void testDeleteSinglePath() throws BlobStorageException, IOException { EasyMock.reset(azureStorage); Capture containerCapture = EasyMock.newCapture(); @@ -169,7 +168,7 @@ public void testDeleteSinglePath() throws URISyntaxException, StorageException, } @Test - public void testDeleteMultiplePaths() throws URISyntaxException, StorageException, IOException + public void testDeleteMultiplePaths() throws BlobStorageException, IOException { EasyMock.reset(azureStorage); Capture containerCapture = EasyMock.newCapture(); @@ -189,7 +188,7 @@ public void testDeleteMultiplePaths() throws URISyntaxException, StorageExceptio } @Test - public void testListDir() throws URISyntaxException, StorageException, IOException + public void testListDir() throws BlobStorageException, IOException { EasyMock.reset(azureStorage); EasyMock.expect(azureStorage.listDir(EasyMock.anyString(), EasyMock.anyString(), EasyMock.anyInt())) diff --git a/licenses.yaml b/licenses.yaml index 6f3fc81f49d0..2ff53e9e9795 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -254,8 +254,10 @@ libraries: - com.fasterxml.jackson.core: jackson-core - com.fasterxml.jackson.dataformat: jackson-dataformat-cbor - com.fasterxml.jackson.dataformat: jackson-dataformat-smile + - com.fasterxml.jackson.dataformat: jackson-dataformat-xml - com.fasterxml.jackson.datatype: jackson-datatype-guava - com.fasterxml.jackson.datatype: jackson-datatype-joda + - com.fasterxml.jackson.datatype: jackson-datatype-jsr310 - com.fasterxml.jackson.jaxrs: jackson-jaxrs-base - com.fasterxml.jackson.jaxrs: jackson-jaxrs-json-provider - com.fasterxml.jackson.jaxrs: jackson-jaxrs-smile-provider @@ -1305,6 +1307,11 @@ libraries: - io.netty: netty-transport-classes-epoll - io.netty: netty-transport-native-epoll - io.netty: netty-transport-native-unix-common + - io.netty: netty-codec-http2 + - io.netty: netty-resolver-dns-classes-macos + - io.netty: netty-transport-classes-kqueue + - io.netty: netty-resolver-dns-native-macos + - io.netty: netty-transport-native-kqueue notice: | == The Netty Project @@ -4544,14 +4551,406 @@ libraries: --- -name: Microsoft Azure Storage Client SDK +name: Microsoft Azure Blob Storage SDK license_category: binary module: extensions/druid-azure-extensions -license_name: Apache License version 2.0 +license_name: MIT License +copyright: Microsoft +version: 12.25.1 +libraries: + - com.azure: azure-storage-blob + +--- + +name: Microsoft Azure Identity SDK +license_category: binary +module: extensions/druid-azure-extensions +license_name: MIT License copyright: Microsoft -version: 8.6.0 +version: 1.11.1 libraries: - - com.microsoft.azure: azure-storage + - com.azure: azure-identity + +--- + +name: Microsoft Azure Batch Blob Storage SDK +license_category: binary +module: extensions/druid-azure-extensions +license_name: MIT License +copyright: Microsoft +version: 12.21.1 +libraries: + - com.azure: azure-storage-blob-batch + +--- + +name: Microsoft Azure Storage Common +license_category: binary +module: extensions/druid-azure-extensions +license_name: MIT License +copyright: Microsoft +version: 12.24.1 +libraries: + - com.azure: azure-storage-common + +--- + +name: Microsoft Azure Internal Avro +license_category: binary +module: extensions/druid-azure-extensions +license_name: MIT License +copyright: Microsoft +version: 12.10.1 +libraries: + - com.azure: azure-storage-internal-avro + +--- + +name: Microsoft Azure JSON +license_category: binary +module: extensions/druid-azure-extensions +license_name: MIT License +copyright: Microsoft +version: 1.1.0 +libraries: + - com.azure: azure-json + +--- +name: Microsoft Azure Netty Http +license_category: binary +module: extensions/druid-azure-extensions +license_name: MIT License +copyright: Microsoft +version: 1.13.11 +libraries: + - com.azure: azure-core-http-netty + +--- + +name: Microsoft Azure Core +license_category: binary +module: extensions/druid-azure-extensions +license_name: MIT License +copyright: Microsoft +version: 1.45.1 +libraries: + - com.azure: azure-core + +--- + +name: Microsoft MSAL4J +license_category: binary +module: extensions/druid-azure-extensions +license_name: MIT License +copyright: Microsoft +version: 1.14.0 +libraries: + - com.microsoft.azure: msal4j + +--- + +name: Microsoft MSAL4J Persistence +license_category: binary +module: extensions/druid-azure-extensions +license_name: MIT License +copyright: Microsoft +version: 1.2.0 +libraries: + - com.microsoft.azure: msal4j-persistence-extension + +--- + +name: NimbusDS Content Type +license_category: binary +module: extensions/druid-azure-extensions +license_name: Apache License version 2.0 +version: 2.2 +libraries: + - com.nimbusds: content-type + +--- + +name: NimbusDS Jose +license_category: binary +module: extensions/druid-azure-extensions +license_name: Apache License version 2.0 +version: 9.30.2 +libraries: + - com.nimbusds: nimbus-jose-jwt + +--- + +name: NimbusDS Oauth +license_category: binary +module: extensions/druid-azure-extensions +license_name: Apache License version 2.0 +version: 10.7.1 +libraries: + - com.nimbusds: oauth2-oidc-sdk + +--- + +name: Reactor Netty +license_category: binary +module: extensions/druid-azure-extensions +license_name: Apache License version 2.0 +version: 1.0.39 +libraries: + - io.projectreactor.netty: reactor-netty-core + - io.projectreactor.netty: reactor-netty-http + +--- + +name: Reactor Core +license_category: binary +module: extensions/druid-azure-extensions +license_name: Apache License version 2.0 +version: 3.4.34 +libraries: + - io.projectreactor: reactor-core +--- + +name: Woodstox +license_category: binary +module: extensions/druid-azure-extensions +license_name: Apache License version 2.0 +version: 6.2.4 +libraries: + - com.fasterxml.woodstox: woodstox-core + +--- +name: Netty +license_category: binary +module: extensions/druid-azure-extensions +license_name: Apache License version 2.0 +version: 2.0.61.Final +libraries: + - io.netty: netty-tcnative-boringssl-static + - io.netty: netty-tcnative-classes +notice: | + == + The Netty Project + ================= + + Please visit the Netty web site for more information: + + * http://netty.io/ + + Copyright 2014 The Netty Project + + The Netty Project licenses this file to you under the Apache License, + version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + + Also, please refer to each LICENSE..txt file, which is located in + the 'license' directory of the distribution file, for the license terms of the + components that this product depends on. + + ------------------------------------------------------------------------------- + This product contains the extensions to Java Collections Framework which has + been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene: + + * LICENSE: + * license/LICENSE.jsr166y.txt (Public Domain) + * HOMEPAGE: + * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/ + * http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/ + + This product contains a modified version of Robert Harder's Public Domain + Base64 Encoder and Decoder, which can be obtained at: + + * LICENSE: + * license/LICENSE.base64.txt (Public Domain) + * HOMEPAGE: + * http://iharder.sourceforge.net/current/java/base64/ + + This product contains a modified portion of 'Webbit', an event based + WebSocket and HTTP server, which can be obtained at: + + * LICENSE: + * license/LICENSE.webbit.txt (BSD License) + * HOMEPAGE: + * https://github.com/joewalnes/webbit + + This product contains a modified portion of 'SLF4J', a simple logging + facade for Java, which can be obtained at: + + * LICENSE: + * license/LICENSE.slf4j.txt (MIT License) + * HOMEPAGE: + * http://www.slf4j.org/ + + This product contains a modified portion of 'Apache Harmony', an open source + Java SE, which can be obtained at: + + * NOTICE: + * license/NOTICE.harmony.txt + * LICENSE: + * license/LICENSE.harmony.txt (Apache License 2.0) + * HOMEPAGE: + * http://archive.apache.org/dist/harmony/ + + This product contains a modified portion of 'jbzip2', a Java bzip2 compression + and decompression library written by Matthew J. Francis. It can be obtained at: + + * LICENSE: + * license/LICENSE.jbzip2.txt (MIT License) + * HOMEPAGE: + * https://code.google.com/p/jbzip2/ + + This product contains a modified portion of 'libdivsufsort', a C API library to construct + the suffix array and the Burrows-Wheeler transformed string for any input string of + a constant-size alphabet written by Yuta Mori. It can be obtained at: + + * LICENSE: + * license/LICENSE.libdivsufsort.txt (MIT License) + * HOMEPAGE: + * https://github.com/y-256/libdivsufsort + + This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM, + which can be obtained at: + + * LICENSE: + * license/LICENSE.jctools.txt (ASL2 License) + * HOMEPAGE: + * https://github.com/JCTools/JCTools + + This product optionally depends on 'JZlib', a re-implementation of zlib in + pure Java, which can be obtained at: + + * LICENSE: + * license/LICENSE.jzlib.txt (BSD style License) + * HOMEPAGE: + * http://www.jcraft.com/jzlib/ + + This product optionally depends on 'Compress-LZF', a Java library for encoding and + decoding data in LZF format, written by Tatu Saloranta. It can be obtained at: + + * LICENSE: + * license/LICENSE.compress-lzf.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/ning/compress + + This product optionally depends on 'lz4', a LZ4 Java compression + and decompression library written by Adrien Grand. It can be obtained at: + + * LICENSE: + * license/LICENSE.lz4.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/jpountz/lz4-java + + This product optionally depends on 'lzma-java', a LZMA Java compression + and decompression library, which can be obtained at: + + * LICENSE: + * license/LICENSE.lzma-java.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/jponge/lzma-java + + This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression + and decompression library written by William Kinney. It can be obtained at: + + * LICENSE: + * license/LICENSE.jfastlz.txt (MIT License) + * HOMEPAGE: + * https://code.google.com/p/jfastlz/ + + This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data + interchange format, which can be obtained at: + + * LICENSE: + * license/LICENSE.protobuf.txt (New BSD License) + * HOMEPAGE: + * https://github.com/google/protobuf + + This product optionally depends on 'Bouncy Castle Crypto APIs' to generate + a temporary self-signed X.509 certificate when the JVM does not provide the + equivalent functionality. It can be obtained at: + + * LICENSE: + * license/LICENSE.bouncycastle.txt (MIT License) + * HOMEPAGE: + * http://www.bouncycastle.org/ + + This product optionally depends on 'Snappy', a compression library produced + by Google Inc, which can be obtained at: + + * LICENSE: + * license/LICENSE.snappy.txt (New BSD License) + * HOMEPAGE: + * https://github.com/google/snappy + + This product optionally depends on 'JBoss Marshalling', an alternative Java + serialization API, which can be obtained at: + + * LICENSE: + * license/LICENSE.jboss-marshalling.txt (GNU LGPL 2.1) + * HOMEPAGE: + * http://www.jboss.org/jbossmarshalling + + This product optionally depends on 'Caliper', Google's micro- + benchmarking framework, which can be obtained at: + + * LICENSE: + * license/LICENSE.caliper.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/google/caliper + + This product optionally depends on 'Apache Commons Logging', a logging + framework, which can be obtained at: + + * LICENSE: + * license/LICENSE.commons-logging.txt (Apache License 2.0) + * HOMEPAGE: + * http://commons.apache.org/logging/ + + This product optionally depends on 'Apache Log4J', a logging framework, which + can be obtained at: + + * LICENSE: + * license/LICENSE.log4j.txt (Apache License 2.0) + * HOMEPAGE: + * http://logging.apache.org/log4j/ + + This product optionally depends on 'Aalto XML', an ultra-high performance + non-blocking XML processor, which can be obtained at: + + * LICENSE: + * license/LICENSE.aalto-xml.txt (Apache License 2.0) + * HOMEPAGE: + * http://wiki.fasterxml.com/AaltoHome + + This product contains a modified version of 'HPACK', a Java implementation of + the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at: + + * LICENSE: + * license/LICENSE.hpack.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/twitter/hpack + + This product contains a modified portion of 'Apache Commons Lang', a Java library + provides utilities for the java.lang API, which can be obtained at: + + * LICENSE: + * license/LICENSE.commons-lang.txt (Apache License 2.0) + * HOMEPAGE: + * https://commons.apache.org/proper/commons-lang/ + + + This product contains the Maven wrapper scripts from 'Maven Wrapper', that provides an easy way to ensure a user has everything necessary to run the Maven build. + + * LICENSE: + * license/LICENSE.mvn-wrapper.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/takari/maven-wrapper --- diff --git a/website/.spelling b/website/.spelling index ec714615199b..6183e80aa316 100644 --- a/website/.spelling +++ b/website/.spelling @@ -2325,4 +2325,6 @@ isLeader taskslots loadstatus sqlQueryId +useAzureCredentialsChain +DefaultAzureCredential LAST_VALUE