Skip to content

Commit

Permalink
Azure client upgrade to allow identity options (#15287)
Browse files Browse the repository at this point in the history
* Include new dependencies

* Mostly implemented

* More azure fixes

* Tests passing

* Unit tests running

* Test running after removing storage exception

* Happy with coverage now

* Add more tests

* fix client factory

* cleanup from testing

* Remove old client

* update docs

* Exclude from spellcheck

* Add licenses

* Fix identity version

* Save work

* Add azure clients

* add licenses

* typos

* Add dependencies

* Exception is not thrown

* Fix intellij check

* Don't need to override

* specify length

* urldecode

* encode path

* Fix checks

* Revert urlencode changes

* Urlencode with azure library

* Update docs/development/extensions-core/azure.md

Co-authored-by: Abhishek Agarwal <[email protected]>

* PR changes

* Update docs/development/extensions-core/azure.md

Co-authored-by: 317brian <[email protected]>

* Deprecate AzureTaskLogsConfig.maxRetries

* Clean up azure retry block

* logic update to reuse clients

* fix comments

* Create container conditionally

* Fix key auth

* Remove container client logic

* Add some more testing

* Update comments

* Add a comment explaining client reuse

* Move logic to factory class

* use bom for dependency management

* fix license versions

---------

Co-authored-by: Abhishek Agarwal <[email protected]>
Co-authored-by: 317brian <[email protected]>
  • Loading branch information
3 people authored Jan 3, 2024
1 parent b8060fc commit 8e95cea
Show file tree
Hide file tree
Showing 34 changed files with 1,287 additions and 1,001 deletions.
6 changes: 4 additions & 2 deletions docs/development/extensions-core/azure.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ To use this Apache Druid extension, [include](../../configuration/extensions.md#
|--------|---------------|-----------|-------|
|`druid.storage.type`|azure||Must be set.|
|`druid.azure.account`||Azure Storage account name.|Must be set.|
|`druid.azure.key`||Azure Storage account key.|Optional. Either set key or sharedAccessStorageToken but not both.|
|`druid.azure.sharedAccessStorageToken`||Azure Shared Storage access token|Optional. Either set key or sharedAccessStorageToken but not both.|
|`druid.azure.key`||Azure Storage account key.|Optional. Set one of key, sharedAccessStorageToken or useAzureCredentialsChain.|
|`druid.azure.sharedAccessStorageToken`||Azure Shared Storage access token|Optional. Set one of key, sharedAccessStorageToken or useAzureCredentialsChain..|
|`druid.azure.useAzureCredentialsChain`|Use [DefaultAzureCredential](https://learn.microsoft.com/en-us/java/api/overview/azure/identity-readme?view=azure-java-stable) for authentication|Optional. Set one of key, sharedAccessStorageToken or useAzureCredentialsChain.|False|
|`druid.azure.managedIdentityClientId`|If you want to use managed identity authentication in the `DefaultAzureCredential`, `useAzureCredentialsChain` must be true.||Optional.|
|`druid.azure.container`||Azure Storage container name.|Must be set.|
|`druid.azure.prefix`|A prefix string that will be prepended to the blob names for the segments published to Azure deep storage| |""|
|`druid.azure.protocol`|the protocol to use|http or https|https|
Expand Down
57 changes: 34 additions & 23 deletions extensions-core/azure-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,43 @@
<relativePath>../../pom.xml</relativePath>
</parent>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-sdk-bom</artifactId>
<version>1.2.19</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
<version>8.6.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob-batch</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-common</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
Expand Down Expand Up @@ -129,7 +136,11 @@
<artifactId>commons-lang</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<scope>provided</scope>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

package org.apache.druid.data.input.azure;

import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.CloudObjectInputSource;
Expand All @@ -42,7 +42,6 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -150,7 +149,7 @@ public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> pre
blob.getBlobLength()
);
}
catch (URISyntaxException | StorageException e) {
catch (BlobStorageException e) {
throw new RuntimeException(e);
}
}
Expand All @@ -161,14 +160,14 @@ public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> pre
public long getObjectSize(CloudObjectLocation location)
{
try {
final CloudBlob blobWithAttributes = storage.getBlockBlobReferenceWithAttributes(
final BlockBlobClient blobWithAttributes = storage.getBlockBlobReferenceWithAttributes(
location.getBucket(),
location.getPath()
);

return blobWithAttributes.getProperties().getLength();
return blobWithAttributes.getProperties().getBlobSize();
}
catch (URISyntaxException | StorageException e) {
catch (BlobStorageException e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public class AzureAccountConfig
@JsonProperty
private String sharedAccessStorageToken;

@JsonProperty
private String managedIdentityClientId;

@JsonProperty
private Boolean useAzureCredentialsChain = Boolean.FALSE;

@SuppressWarnings("unused") // Used by Jackson deserialization?
public void setProtocol(String protocol)
{
Expand Down Expand Up @@ -94,9 +100,25 @@ public String getSharedAccessStorageToken()
return sharedAccessStorageToken;
}

public Boolean getUseAzureCredentialsChain()
{
return useAzureCredentialsChain;
}

public String getManagedIdentityClientId()
{
return managedIdentityClientId;
}


@SuppressWarnings("unused") // Used by Jackson deserialization?
public void setSharedAccessStorageToken(String sharedAccessStorageToken)
{
this.sharedAccessStorageToken = sharedAccessStorageToken;
}

public void setUseAzureCredentialsChain(Boolean useAzureCredentialsChain)
{
this.useAzureCredentialsChain = useAzureCredentialsChain;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@

package org.apache.druid.storage.azure;

import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.io.ByteSource;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.microsoft.azure.storage.StorageException;
import org.apache.druid.java.util.common.logger.Logger;

import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;

/**
* Used for getting an {@link InputStream} to an azure resource.
Expand Down Expand Up @@ -62,7 +61,7 @@ public InputStream openStream(long offset) throws IOException
try {
return azureStorage.getBlockBlobInputStream(offset, containerName, blobPath);
}
catch (StorageException | URISyntaxException e) {
catch (BlobStorageException e) {
if (AzureUtils.AZURE_RETRY.apply(e)) {
throw new IOException("Recoverable exception", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.storage.azure;

import com.azure.core.http.policy.ExponentialBackoffOptions;
import com.azure.core.http.policy.RetryOptions;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

/**
* Factory class for generating BlobServiceClient objects.
*/
public class AzureClientFactory
{

private final AzureAccountConfig config;
private final Map<Integer, BlobServiceClient> cachedBlobServiceClients;

public AzureClientFactory(AzureAccountConfig config)
{
this.config = config;
this.cachedBlobServiceClients = new HashMap<>();
}

// It's okay to store clients in a map here because all the configs for specifying azure retries are static, and there are only 2 of them.
// The 2 configs are AzureAccountConfig.maxTries and AzureOutputConfig.maxRetrr.
// We will only ever have at most 2 clients in cachedBlobServiceClients.
public BlobServiceClient getBlobServiceClient(Integer retryCount)
{
if (!cachedBlobServiceClients.containsKey(retryCount)) {
BlobServiceClientBuilder clientBuilder = getAuthenticatedBlobServiceClientBuilder()
.retryOptions(new RetryOptions(
new ExponentialBackoffOptions()
.setMaxRetries(retryCount != null ? retryCount : config.getMaxTries())
.setBaseDelay(Duration.ofMillis(1000))
.setMaxDelay(Duration.ofMillis(60000))
));
cachedBlobServiceClients.put(retryCount, clientBuilder.buildClient());
}

return cachedBlobServiceClients.get(retryCount);
}

private BlobServiceClientBuilder getAuthenticatedBlobServiceClientBuilder()
{
BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder()
.endpoint("https://" + config.getAccount() + ".blob.core.windows.net");

if (config.getKey() != null) {
clientBuilder.credential(new StorageSharedKeyCredential(config.getAccount(), config.getKey()));
} else if (config.getSharedAccessStorageToken() != null) {
clientBuilder.sasToken(config.getSharedAccessStorageToken());
} else if (config.getUseAzureCredentialsChain()) {
// We might not use the managed identity client id in the credential chain but we can just set it here and it will no-op.
DefaultAzureCredentialBuilder defaultAzureCredentialBuilder = new DefaultAzureCredentialBuilder()
.managedIdentityClientId(config.getManagedIdentityClientId());
clientBuilder.credential(defaultAzureCredentialBuilder.build());
}
return clientBuilder;
}
}
Loading

0 comments on commit 8e95cea

Please sign in to comment.