Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure client upgrade to allow identity options #15287

Merged
merged 51 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
a77a257
Include new dependencies
georgew5656 Oct 19, 2023
17e7074
Mostly implemented
georgew5656 Oct 19, 2023
5aa991f
More azure fixes
georgew5656 Oct 19, 2023
dfe948b
Tests passing
georgew5656 Oct 23, 2023
02343d4
Unit tests running
georgew5656 Oct 23, 2023
70c6cf6
Test running after removing storage exception
georgew5656 Oct 23, 2023
d9b28eb
Happy with coverage now
georgew5656 Oct 24, 2023
983ff52
Add more tests
georgew5656 Oct 24, 2023
9aaf451
fix client factory
georgew5656 Oct 25, 2023
6a99496
cleanup from testing
georgew5656 Oct 26, 2023
ddcfc03
Remove old client
georgew5656 Oct 30, 2023
305eb85
update docs
georgew5656 Oct 31, 2023
9e84ade
Exclude from spellcheck
georgew5656 Oct 31, 2023
fae6fa1
Add licenses
georgew5656 Nov 1, 2023
c2a82fe
Fix identity version
georgew5656 Nov 1, 2023
43e81c5
Save work
georgew5656 Nov 1, 2023
9812a67
Add azure clients
georgew5656 Nov 1, 2023
28cbf0a
Merge branch 'master' of github.com:georgew5656/druid into azureClien…
georgew5656 Nov 2, 2023
12ab481
fix merge conflict
georgew5656 Nov 2, 2023
0f80541
add licenses
georgew5656 Nov 2, 2023
405310c
typos
georgew5656 Nov 3, 2023
7da2f4d
Add dependencies
georgew5656 Nov 3, 2023
13dd4d2
Merge branch 'master' of github.com:georgew5656/druid into azureClien…
georgew5656 Nov 3, 2023
0787dbb
Exception is not thrown
georgew5656 Nov 3, 2023
295db27
Fix intellij check
georgew5656 Nov 3, 2023
bf74260
Don't need to override
georgew5656 Nov 3, 2023
260d72f
specify length
georgew5656 Nov 8, 2023
bb72518
urldecode
georgew5656 Nov 9, 2023
129d3d9
encode path
georgew5656 Nov 9, 2023
d023f63
Fix checks
georgew5656 Nov 9, 2023
2c5d257
Merge branch 'master' into azureClientUpgrade
georgew5656 Nov 9, 2023
423d226
Revert urlencode changes
georgew5656 Nov 15, 2023
60a7b96
Urlencode with azure library
georgew5656 Nov 15, 2023
ce79b59
Update docs/development/extensions-core/azure.md
georgew5656 Nov 15, 2023
cfff1ee
Merge branch 'azureClientUpgrade' of github.com:georgew5656/druid int…
georgew5656 Nov 15, 2023
9ca19e0
PR changes
georgew5656 Nov 15, 2023
8133e1d
Update docs/development/extensions-core/azure.md
georgew5656 Nov 15, 2023
fe5758f
Deprecate AzureTaskLogsConfig.maxRetries
georgew5656 Nov 27, 2023
92f7535
Merge branch 'azureClientUpgrade' of github.com:georgew5656/druid int…
georgew5656 Nov 27, 2023
a4fa3a2
Clean up azure retry block
georgew5656 Nov 27, 2023
4654815
logic update to reuse clients
georgew5656 Nov 27, 2023
be67328
fix comments
georgew5656 Nov 27, 2023
292ff42
Create container conditionally
georgew5656 Nov 28, 2023
3dfabb3
Fix key auth
georgew5656 Nov 28, 2023
53f7dd7
Remove container client logic
georgew5656 Dec 7, 2023
daa6179
Add some more testing
georgew5656 Dec 7, 2023
9a069ea
Update comments
georgew5656 Dec 8, 2023
9f04d39
Add a comment explaining client reuse
georgew5656 Dec 8, 2023
6899704
Move logic to factory class
georgew5656 Jan 3, 2024
907c4e9
use bom for dependency management
georgew5656 Jan 3, 2024
8d2d3c0
fix license versions
georgew5656 Jan 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/development/extensions-core/azure.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ To use this Apache Druid extension, [include](../../configuration/extensions.md#
|--------|---------------|-----------|-------|
|`druid.storage.type`|azure||Must be set.|
|`druid.azure.account`||Azure Storage account name.|Must be set.|
|`druid.azure.key`||Azure Storage account key.|Optional. Either set key or sharedAccessStorageToken but not both.|
|`druid.azure.sharedAccessStorageToken`||Azure Shared Storage access token|Optional. Either set key or sharedAccessStorageToken but not both.|
|`druid.azure.key`||Azure Storage account key.|Optional. Set one of key, sharedAccessStorageToken or useAzureCredentialsChain.|
|`druid.azure.sharedAccessStorageToken`||Azure Shared Storage access token|Optional. Set one of key, sharedAccessStorageToken or useAzureCredentialsChain..|
|`druid.azure.useAzureCredentialsChain`|Use [DefaultAzureCredential](https://learn.microsoft.com/en-us/java/api/overview/azure/identity-readme?view=azure-java-stable) for authentication|Optional. Set one of key, sharedAccessStorageToken or useAzureCredentialsChain.|False|
|`druid.azure.managedIdentityClientId`|If you want to use managed identity authentication in the `DefaultAzureCredential`, `useAzureCredentialsChain` must be true.||Optional.|
|`druid.azure.container`||Azure Storage container name.|Must be set.|
|`druid.azure.prefix`|A prefix string that will be prepended to the blob names for the segments published to Azure deep storage| |""|
|`druid.azure.protocol`|the protocol to use|http or https|https|
Expand Down
51 changes: 28 additions & 23 deletions extensions-core/azure-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,30 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
<version>8.6.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>12.24.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob-batch</artifactId>
<version>12.20.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-common</artifactId>
<version>12.23.0</version>
</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these dependencies be the same version?

Copy link
Contributor Author

@georgew5656 georgew5656 Dec 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately azure does not sync these versions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://learn.microsoft.com/en-us/azure/developer/java/sdk/get-started-maven#use-the-azure-sdk-for-java-build-tool - have you looked into adding this to see if the azure libraries are added correctly?

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.43.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
Expand Down Expand Up @@ -129,7 +130,11 @@
<artifactId>commons-lang</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<scope>provided</scope>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

package org.apache.druid.data.input.azure;

import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.CloudObjectInputSource;
Expand All @@ -42,7 +42,6 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -150,7 +149,7 @@ public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> pre
blob.getBlobLength()
);
}
catch (URISyntaxException | StorageException e) {
catch (BlobStorageException e) {
throw new RuntimeException(e);
}
}
Expand All @@ -161,14 +160,14 @@ public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> pre
public long getObjectSize(CloudObjectLocation location)
{
try {
final CloudBlob blobWithAttributes = storage.getBlockBlobReferenceWithAttributes(
final BlockBlobClient blobWithAttributes = storage.getBlockBlobReferenceWithAttributes(
location.getBucket(),
location.getPath()
);

return blobWithAttributes.getProperties().getLength();
return blobWithAttributes.getProperties().getBlobSize();
}
catch (URISyntaxException | StorageException e) {
catch (BlobStorageException e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public class AzureAccountConfig
@JsonProperty
private String sharedAccessStorageToken;

@JsonProperty
private String managedIdentityClientId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add validation to error out if the user sets this without setting useAzureCredentialsChain = true as called out in the docs


@JsonProperty
private Boolean useAzureCredentialsChain = Boolean.FALSE;

@SuppressWarnings("unused") // Used by Jackson deserialization?
public void setProtocol(String protocol)
{
Expand Down Expand Up @@ -94,9 +100,25 @@ public String getSharedAccessStorageToken()
return sharedAccessStorageToken;
}

public Boolean getUseAzureCredentialsChain()
{
return useAzureCredentialsChain;
}

public String getManagedIdentityClientId()
{
return managedIdentityClientId;
}


@SuppressWarnings("unused") // Used by Jackson deserialization?
public void setSharedAccessStorageToken(String sharedAccessStorageToken)
{
this.sharedAccessStorageToken = sharedAccessStorageToken;
}

public void setUseAzureCredentialsChain(Boolean useAzureCredentialsChain)
{
this.useAzureCredentialsChain = useAzureCredentialsChain;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@

package org.apache.druid.storage.azure;

import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.io.ByteSource;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.microsoft.azure.storage.StorageException;
import org.apache.druid.java.util.common.logger.Logger;

import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;

/**
* Used for getting an {@link InputStream} to an azure resource.
Expand Down Expand Up @@ -62,7 +61,7 @@ public InputStream openStream(long offset) throws IOException
try {
return azureStorage.getBlockBlobInputStream(offset, containerName, blobPath);
}
catch (StorageException | URISyntaxException e) {
catch (BlobStorageException e) {
if (AzureUtils.AZURE_RETRY.apply(e)) {
throw new IOException("Recoverable exception", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.storage.azure;

import com.azure.core.http.policy.ExponentialBackoffOptions;
import com.azure.core.http.policy.RetryOptions;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;

import javax.annotation.Nonnull;
import java.time.Duration;

/**
* Factory class for generating BlobServiceClient objects.
*/
public class AzureClientFactory
{

private final AzureAccountConfig config;

public AzureClientFactory(AzureAccountConfig config)
{
this.config = config;
}

public BlobServiceClient getBlobServiceClient()
{
return getAuthenticatedBlobServiceClientBuilder().buildClient();
}

/**
* Azure doesn't let us override retryConfigs on BlobServiceClient so we need a second instance.
* @param retryCount number of retries
* @return BlobServiceClient with a custom retryCount
*/
public BlobServiceClient getRetriableBlobServiceClient(@Nonnull Integer retryCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for this factory to return both a retriable and non-retriable client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explained this in a below comment

{
BlobServiceClientBuilder clientBuilder = getAuthenticatedBlobServiceClientBuilder()
.retryOptions(new RetryOptions(
new ExponentialBackoffOptions()
.setMaxRetries(retryCount)
.setBaseDelay(Duration.ofMillis(1000))
.setMaxDelay(Duration.ofMillis(60000))
));
return clientBuilder.buildClient();
}

private BlobServiceClientBuilder getAuthenticatedBlobServiceClientBuilder()
{
BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder()
.endpoint("https://" + config.getAccount() + ".blob.core.windows.net");

if (config.getKey() != null) {
clientBuilder.credential(new StorageSharedKeyCredential(config.getAccount(), config.getKey()));
} else if (config.getSharedAccessStorageToken() != null) {
clientBuilder.sasToken(config.getSharedAccessStorageToken());
} else if (config.getUseAzureCredentialsChain()) {
// We might not use the managed identity client id in the credential chain but we can just set it here and it will no-op.
DefaultAzureCredentialBuilder defaultAzureCredentialBuilder = new DefaultAzureCredentialBuilder()
.managedIdentityClientId(config.getManagedIdentityClientId());
clientBuilder.credential(defaultAzureCredentialBuilder.build());
}
return clientBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,12 @@

package org.apache.druid.storage.azure;

import com.azure.storage.blob.models.BlobItem;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.microsoft.azure.storage.ResultContinuation;
import com.microsoft.azure.storage.ResultSegment;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
import org.apache.druid.storage.azure.blob.ListBlobItemHolder;
import org.apache.druid.storage.azure.blob.ListBlobItemHolderFactory;

import java.net.URI;
import java.util.Iterator;
Expand All @@ -42,36 +38,28 @@ public class AzureCloudBlobIterator implements Iterator<CloudBlobHolder>
{
private static final Logger log = new Logger(AzureCloudBlobIterator.class);
private final AzureStorage storage;
private final ListBlobItemHolderFactory blobItemDruidFactory;
private final Iterator<URI> prefixesIterator;
private final int maxListingLength;

private ResultSegment<ListBlobItem> result;
private String currentContainer;
private String currentPrefix;
private ResultContinuation continuationToken;
private CloudBlobHolder currentBlobItem;
private Iterator<ListBlobItem> blobItemIterator;
private Iterator<BlobItem> blobItemIterator;
private final AzureAccountConfig config;

@AssistedInject
AzureCloudBlobIterator(
AzureStorage storage,
ListBlobItemHolderFactory blobItemDruidFactory,
AzureAccountConfig config,
@Assisted final Iterable<URI> prefixes,
@Assisted final int maxListingLength
)
{
this.storage = storage;
this.blobItemDruidFactory = blobItemDruidFactory;
this.config = config;
this.prefixesIterator = prefixes.iterator();
this.maxListingLength = maxListingLength;
this.result = null;
this.currentContainer = null;
this.currentPrefix = null;
this.continuationToken = null;
this.currentBlobItem = null;
this.blobItemIterator = null;

Expand Down Expand Up @@ -108,8 +96,6 @@ private void prepareNextRequest()
log.debug("currentUri: %s\ncurrentContainer: %s\ncurrentPrefix: %s",
currentUri, currentContainer, currentPrefix
);
result = null;
continuationToken = null;
}

private void fetchNextBatch()
Expand All @@ -121,14 +107,13 @@ private void fetchNextBatch()
currentContainer,
currentPrefix
);
result = AzureUtils.retryAzureOperation(() -> storage.listBlobsWithPrefixInContainerSegmented(
// We don't need to iterate by page because the client handles this, it will fetch the next page when necessary.
blobItemIterator = storage.listBlobsWithPrefixInContainerSegmented(
currentContainer,
currentPrefix,
continuationToken,
maxListingLength
), config.getMaxTries());
continuationToken = result.getContinuationToken();
blobItemIterator = result.getResults().iterator();
maxListingLength,
config.getMaxTries()
).stream().iterator();
}
catch (Exception e) {
throw new RE(
Expand All @@ -146,19 +131,15 @@ private void fetchNextBatch()
*/
private void advanceBlobItem()
{
while (blobItemIterator.hasNext() || continuationToken != null || prefixesIterator.hasNext()) {
while (prefixesIterator.hasNext() || blobItemIterator.hasNext()) {
while (blobItemIterator.hasNext()) {
ListBlobItemHolder blobItem = blobItemDruidFactory.create(blobItemIterator.next());
/* skip directory objects */
if (blobItem.isCloudBlob() && blobItem.getCloudBlob().getBlobLength() > 0) {
currentBlobItem = blobItem.getCloudBlob();
BlobItem blobItem = blobItemIterator.next();
if (!blobItem.isPrefix() && blobItem.getProperties().getContentLength() > 0) {
currentBlobItem = new CloudBlobHolder(blobItem, currentContainer);
return;
}
}

if (continuationToken != null) {
fetchNextBatch();
} else if (prefixesIterator.hasNext()) {
if (prefixesIterator.hasNext()) {
prepareNextRequest();
fetchNextBatch();
}
Expand Down
Loading