Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Azure Goverment storage #15523

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/development/extensions-core/azure.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,6 @@ To use this Apache Druid extension, [include](../../configuration/extensions.md#
|`druid.azure.protocol`|the protocol to use|http or https|https|
|`druid.azure.maxTries`|Number of tries before canceling an Azure operation.| |3|
|`druid.azure.maxListingLength`|maximum number of input files matching a given prefix to retrieve at a time| |1024|
|`druid.azure.endpointSuffix`|The endpoint suffix to use. Override the default value to connect to [Azure Government](https://learn.microsoft.com/en-us/azure/azure-government/documentation-government-get-started-connect-to-storage#getting-started-with-storage-api).|Examples: `core.windows.net`, `core.usgovcloudapi.net`|`core.windows.net`|

See [Azure Services](http://azure.microsoft.com/en-us/pricing/free-trial/) for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public class AzureAccountConfig
@JsonProperty
private Boolean useAzureCredentialsChain = Boolean.FALSE;

@JsonProperty
private String endpointSuffix = AzureUtils.DEFAULT_AZURE_ENDPOINT_SUFFIX;

@SuppressWarnings("unused") // Used by Jackson deserialization?
public void setProtocol(String protocol)
{
Expand All @@ -75,6 +78,12 @@ public void setKey(String key)
this.key = key;
}

@SuppressWarnings("unused") // Used by Jackson deserialization?
public void setEndpointSuffix(String endpointSuffix)
{
this.endpointSuffix = endpointSuffix;
}

public String getProtocol()
{
return protocol;
Expand Down Expand Up @@ -121,4 +130,14 @@ public void setUseAzureCredentialsChain(Boolean useAzureCredentialsChain)
{
this.useAzureCredentialsChain = useAzureCredentialsChain;
}

public String getEndpointSuffix()
{
return endpointSuffix;
}

public String getBlobStorageEndpoint()
{
return "blob." + endpointSuffix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public BlobServiceClient getBlobServiceClient(Integer retryCount)
private BlobServiceClientBuilder getAuthenticatedBlobServiceClientBuilder()
{
BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder()
.endpoint("https://" + config.getAccount() + ".blob.core.windows.net");
.endpoint("https://" + config.getAccount() + "." + config.getBlobStorageEndpoint());

if (config.getKey() != null) {
clientBuilder.credential(new StorageSharedKeyCredential(config.getAccount(), config.getKey()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,16 @@ public class AzureDataSegmentPuller

private final AzureByteSourceFactory byteSourceFactory;

private final AzureAccountConfig azureAccountConfig;

@Inject
public AzureDataSegmentPuller(
AzureByteSourceFactory byteSourceFactory)
AzureByteSourceFactory byteSourceFactory,
AzureAccountConfig azureAccountConfig
)
{
this.byteSourceFactory = byteSourceFactory;
this.azureAccountConfig = azureAccountConfig;
}

FileUtils.FileCopyResult getSegmentFiles(
Expand All @@ -59,7 +64,7 @@ FileUtils.FileCopyResult getSegmentFiles(
"Loading container: [%s], with blobPath: [%s] and outDir: [%s]", containerName, blobPath, outDir
);

final String actualBlobPath = AzureUtils.maybeRemoveAzurePathPrefix(blobPath);
final String actualBlobPath = AzureUtils.maybeRemoveAzurePathPrefix(blobPath, azureAccountConfig.getBlobStorageEndpoint());

final ByteSource byteSource = byteSourceFactory.create(containerName, actualBlobPath);
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public String getPathForHadoop()
AzureUtils.AZURE_STORAGE_HADOOP_PROTOCOL,
segmentConfig.getContainer(),
accountConfig.getAccount(),
AzureUtils.AZURE_STORAGE_HOST_ADDRESS,
accountConfig.getBlobStorageEndpoint(),
prefixIsNullOrEmpty ? "" : StringUtils.maybeRemoveTrailingSlash(prefix) + '/'
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public class AzureStorageDruidModule implements DruidModule

public static final String SCHEME = "azure";
public static final String
STORAGE_CONNECTION_STRING_WITH_KEY = "DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s";
STORAGE_CONNECTION_STRING_WITH_KEY = "DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s;EndpointSuffix=%s;";
public static final String
STORAGE_CONNECTION_STRING_WITH_TOKEN = "DefaultEndpointsProtocol=%s;AccountName=%s;SharedAccessSignature=%s";
STORAGE_CONNECTION_STRING_WITH_TOKEN = "DefaultEndpointsProtocol=%s;AccountName=%s;SharedAccessSignature=%s;EndpointSuffix=%s;";
public static final String INDEX_ZIP_FILE_NAME = "index.zip";

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@
public class AzureUtils
{

public static final String DEFAULT_AZURE_ENDPOINT_SUFFIX = "core.windows.net";
@VisibleForTesting
static final String AZURE_STORAGE_HOST_ADDRESS = "blob.core.windows.net";

// The azure storage hadoop access pattern is:
// wasb[s]://<containername>@<accountname>.blob.core.windows.net/<path>
// wasb[s]://<containername>@<accountname>.blob.<endpointSuffix>/<path>
// (from https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-use-blob-storage)
static final String AZURE_STORAGE_HADOOP_PROTOCOL = "wasbs";

Expand Down Expand Up @@ -88,14 +89,14 @@ public static String extractAzureKey(URI uri)
* @return a String representing the blob path component of the uri with any leading 'blob.core.windows.net/' string
* removed characters removed.
*/
public static String maybeRemoveAzurePathPrefix(String blobPath)
public static String maybeRemoveAzurePathPrefix(String blobPath, String blobStorageEndpointSuffix)
{
boolean blobPathIsHadoop = blobPath.contains(AZURE_STORAGE_HOST_ADDRESS);
boolean blobPathIsHadoop = blobPath.contains(blobStorageEndpointSuffix);

if (blobPathIsHadoop) {
// Remove azure's hadoop prefix to match realtime ingestion path
return blobPath.substring(
blobPath.indexOf(AZURE_STORAGE_HOST_ADDRESS) + AZURE_STORAGE_HOST_ADDRESS.length() + 1);
blobPath.indexOf(blobStorageEndpointSuffix) + blobStorageEndpointSuffix.length() + 1);
} else {
return blobPath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public void test_blobServiceClientBuilder_useAzureAccountConfig_asDefaultMaxTrie
EasyMock.expect(config.getKey()).andReturn("key").times(2);
EasyMock.expect(config.getAccount()).andReturn(ACCOUNT).times(2);
EasyMock.expect(config.getMaxTries()).andReturn(3);
EasyMock.expect(config.getBlobStorageEndpoint()).andReturn(AzureUtils.AZURE_STORAGE_HOST_ADDRESS);
azureClientFactory = new AzureClientFactory(config);
EasyMock.replay(config);
azureClientFactory.getBlobServiceClient(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ public void test_getSegmentFiles_success()
final File toDir = FileUtils.createTempDir();
try {
final InputStream zipStream = new FileInputStream(pulledFile);
final AzureAccountConfig config = new AzureAccountConfig();

EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, BLOB_PATH));
EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andReturn(zipStream);

replayAll();

AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory);
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory, config);

FileUtils.FileCopyResult result = puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH, toDir);

Expand All @@ -92,13 +93,14 @@ public void test_getSegmentFiles_blobPathIsHadoop_success()
final File toDir = FileUtils.createTempDir();
try {
final InputStream zipStream = new FileInputStream(pulledFile);
final AzureAccountConfig config = new AzureAccountConfig();

EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, BLOB_PATH));
EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andReturn(zipStream);

replayAll();

AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory);
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory, config);

FileUtils.FileCopyResult result = puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH_HADOOP, toDir);

Expand All @@ -119,6 +121,7 @@ public void test_getSegmentFiles_blobPathIsHadoop_success()
public void test_getSegmentFiles_nonRecoverableErrorRaisedWhenPullingSegmentFiles_doNotDeleteOutputDirectory()
throws IOException, BlobStorageException, SegmentLoadingException
{
final AzureAccountConfig config = new AzureAccountConfig();

final File outDir = FileUtils.createTempDir();
try {
Expand All @@ -131,7 +134,7 @@ public void test_getSegmentFiles_nonRecoverableErrorRaisedWhenPullingSegmentFile

replayAll();

AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory);
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory, config);

puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH, outDir);
}
Expand All @@ -149,6 +152,7 @@ public void test_getSegmentFiles_nonRecoverableErrorRaisedWhenPullingSegmentFile
public void test_getSegmentFiles_recoverableErrorRaisedWhenPullingSegmentFiles_deleteOutputDirectory()
throws IOException, BlobStorageException, SegmentLoadingException
{
final AzureAccountConfig config = new AzureAccountConfig();

final File outDir = FileUtils.createTempDir();
try {
Expand All @@ -163,7 +167,7 @@ public void test_getSegmentFiles_recoverableErrorRaisedWhenPullingSegmentFiles_d
EasyMock.replay(azureStorage);
EasyMock.replay(byteSourceFactory);

AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory);
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory, config);

puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH, outDir);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,26 @@ public void testAllCredentialsUnset()
);
}

@Test
public void testGetBlobStorageEndpointWithDefaultProperties()
{
Properties properties = initializePropertes();
AzureAccountConfig config = makeInjectorWithProperties(properties).getInstance(AzureAccountConfig.class);
Assert.assertEquals(config.getEndpointSuffix(), AzureUtils.DEFAULT_AZURE_ENDPOINT_SUFFIX);
Assert.assertEquals(config.getBlobStorageEndpoint(), AzureUtils.AZURE_STORAGE_HOST_ADDRESS);
}

@Test
public void testGetBlobStorageEndpointWithCustomBlobPath()
{
Properties properties = initializePropertes();
final String customSuffix = "core.usgovcloudapi.net";
properties.setProperty("druid.azure.endpointSuffix", customSuffix);
AzureAccountConfig config = makeInjectorWithProperties(properties).getInstance(AzureAccountConfig.class);
Assert.assertEquals(config.getEndpointSuffix(), customSuffix);
Assert.assertEquals(config.getBlobStorageEndpoint(), "blob." + customSuffix);
}

private Injector makeInjectorWithProperties(final Properties props)
{
return Guice.createInjector(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ public void test_extractAzureKey_pathHasLeadingSlash_returnsPathWithLeadingSlash
@Test
public void test_maybeRemoveAzurePathPrefix_pathHasLeadingAzurePathPrefix_returnsPathWithLeadingAzurePathRemoved()
{
String path = AzureUtils.maybeRemoveAzurePathPrefix(BLOB_PATH_WITH_LEADING_AZURE_PREFIX);
String path = AzureUtils.maybeRemoveAzurePathPrefix(BLOB_PATH_WITH_LEADING_AZURE_PREFIX, AzureUtils.AZURE_STORAGE_HOST_ADDRESS);
Assert.assertEquals(BLOB_NAME, path);
}

@Test
public void test_maybeRemoveAzurePathPrefix_pathDoesNotHaveAzurePathPrefix__returnsPathWithLeadingAzurePathRemoved()
{
String path = AzureUtils.maybeRemoveAzurePathPrefix(BLOB_NAME);
String path = AzureUtils.maybeRemoveAzurePathPrefix(BLOB_NAME, AzureUtils.AZURE_STORAGE_HOST_ADDRESS);
Assert.assertEquals(BLOB_NAME, path);
}

Expand Down Expand Up @@ -206,4 +206,18 @@ public void test_azureRetry_RunTimeExceptionWrappedInRunTimeException_returnsFal
boolean retry = AzureUtils.AZURE_RETRY.apply(RUNTIME_EXCEPTION_WRAPPED_IN_RUNTIME_EXCEPTON);
Assert.assertFalse(retry);
}

@Test
public void testRemoveAzurePathPrefixDefaultEndpoint()
{
String outputBlob = AzureUtils.maybeRemoveAzurePathPrefix("blob.core.windows.net/container/blob", "blob.core.windows.net");
Assert.assertEquals("container/blob", outputBlob);
}

@Test
public void testRemoveAzurePathPrefixCustomEndpoint()
{
String outputBlob = AzureUtils.maybeRemoveAzurePathPrefix("blob.core.usgovcloudapi.net/container/blob", "blob.core.usgovcloudapi.net");
Assert.assertEquals("container/blob", outputBlob);
}
}
Loading