diff --git a/docs/development/extensions-core/azure.md b/docs/development/extensions-core/azure.md index 198ef3650dc1..003f39cc5540 100644 --- a/docs/development/extensions-core/azure.md +++ b/docs/development/extensions-core/azure.md @@ -42,5 +42,6 @@ To use this Apache Druid extension, [include](../../configuration/extensions.md# |`druid.azure.protocol`|the protocol to use|http or https|https| |`druid.azure.maxTries`|Number of tries before canceling an Azure operation.| |3| |`druid.azure.maxListingLength`|maximum number of input files matching a given prefix to retrieve at a time| |1024| +|`druid.azure.endpointSuffix`|The endpoint suffix to use. Override the default value to connect to [Azure Government](https://learn.microsoft.com/en-us/azure/azure-government/documentation-government-get-started-connect-to-storage#getting-started-with-storage-api).|Examples: `core.windows.net`, `core.usgovcloudapi.net`|`core.windows.net`| See [Azure Services](http://azure.microsoft.com/en-us/pricing/free-trial/) for more information. diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureAccountConfig.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureAccountConfig.java index 5826c68f1b78..97ec3c625b58 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureAccountConfig.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureAccountConfig.java @@ -52,6 +52,9 @@ public class AzureAccountConfig @JsonProperty private Boolean useAzureCredentialsChain = Boolean.FALSE; + @JsonProperty + private String endpointSuffix = AzureUtils.DEFAULT_AZURE_ENDPOINT_SUFFIX; + @SuppressWarnings("unused") // Used by Jackson deserialization? public void setProtocol(String protocol) { @@ -75,6 +78,12 @@ public void setKey(String key) this.key = key; } + @SuppressWarnings("unused") // Used by Jackson deserialization? + public void setEndpointSuffix(String endpointSuffix) + { + this.endpointSuffix = endpointSuffix; + } + public String getProtocol() { return protocol; @@ -121,4 +130,14 @@ public void setUseAzureCredentialsChain(Boolean useAzureCredentialsChain) { this.useAzureCredentialsChain = useAzureCredentialsChain; } + + public String getEndpointSuffix() + { + return endpointSuffix; + } + + public String getBlobStorageEndpoint() + { + return "blob." + endpointSuffix; + } } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java index 365b9d40dab1..3625eaa813ac 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java @@ -67,7 +67,7 @@ public BlobServiceClient getBlobServiceClient(Integer retryCount) private BlobServiceClientBuilder getAuthenticatedBlobServiceClientBuilder() { BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder() - .endpoint("https://" + config.getAccount() + ".blob.core.windows.net"); + .endpoint("https://" + config.getAccount() + "." + config.getBlobStorageEndpoint()); if (config.getKey() != null) { clientBuilder.credential(new StorageSharedKeyCredential(config.getAccount(), config.getKey())); diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPuller.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPuller.java index 571ecc683509..c20413b1169e 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPuller.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPuller.java @@ -38,11 +38,16 @@ public class AzureDataSegmentPuller private final AzureByteSourceFactory byteSourceFactory; + private final AzureAccountConfig azureAccountConfig; + @Inject public AzureDataSegmentPuller( - AzureByteSourceFactory byteSourceFactory) + AzureByteSourceFactory byteSourceFactory, + AzureAccountConfig azureAccountConfig + ) { this.byteSourceFactory = byteSourceFactory; + this.azureAccountConfig = azureAccountConfig; } FileUtils.FileCopyResult getSegmentFiles( @@ -59,7 +64,7 @@ FileUtils.FileCopyResult getSegmentFiles( "Loading container: [%s], with blobPath: [%s] and outDir: [%s]", containerName, blobPath, outDir ); - final String actualBlobPath = AzureUtils.maybeRemoveAzurePathPrefix(blobPath); + final String actualBlobPath = AzureUtils.maybeRemoveAzurePathPrefix(blobPath, azureAccountConfig.getBlobStorageEndpoint()); final ByteSource byteSource = byteSourceFactory.create(containerName, actualBlobPath); final FileUtils.FileCopyResult result = CompressionUtils.unzip( diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java index e2ce8bbb88d5..8180b362b66b 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java @@ -78,7 +78,7 @@ public String getPathForHadoop() AzureUtils.AZURE_STORAGE_HADOOP_PROTOCOL, segmentConfig.getContainer(), accountConfig.getAccount(), - AzureUtils.AZURE_STORAGE_HOST_ADDRESS, + accountConfig.getBlobStorageEndpoint(), prefixIsNullOrEmpty ? "" : StringUtils.maybeRemoveTrailingSlash(prefix) + '/' ); diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java index ec4ed8fa201d..25db821601b5 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java @@ -47,9 +47,9 @@ public class AzureStorageDruidModule implements DruidModule public static final String SCHEME = "azure"; public static final String - STORAGE_CONNECTION_STRING_WITH_KEY = "DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s"; + STORAGE_CONNECTION_STRING_WITH_KEY = "DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s;EndpointSuffix=%s;"; public static final String - STORAGE_CONNECTION_STRING_WITH_TOKEN = "DefaultEndpointsProtocol=%s;AccountName=%s;SharedAccessSignature=%s"; + STORAGE_CONNECTION_STRING_WITH_TOKEN = "DefaultEndpointsProtocol=%s;AccountName=%s;SharedAccessSignature=%s;EndpointSuffix=%s;"; public static final String INDEX_ZIP_FILE_NAME = "index.zip"; @Override diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java index 5309b016d5e5..0c00a633d776 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java @@ -38,11 +38,12 @@ public class AzureUtils { + public static final String DEFAULT_AZURE_ENDPOINT_SUFFIX = "core.windows.net"; @VisibleForTesting static final String AZURE_STORAGE_HOST_ADDRESS = "blob.core.windows.net"; // The azure storage hadoop access pattern is: - // wasb[s]://@.blob.core.windows.net/ + // wasb[s]://@.blob./ // (from https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-use-blob-storage) static final String AZURE_STORAGE_HADOOP_PROTOCOL = "wasbs"; @@ -88,14 +89,14 @@ public static String extractAzureKey(URI uri) * @return a String representing the blob path component of the uri with any leading 'blob.core.windows.net/' string * removed characters removed. */ - public static String maybeRemoveAzurePathPrefix(String blobPath) + public static String maybeRemoveAzurePathPrefix(String blobPath, String blobStorageEndpointSuffix) { - boolean blobPathIsHadoop = blobPath.contains(AZURE_STORAGE_HOST_ADDRESS); + boolean blobPathIsHadoop = blobPath.contains(blobStorageEndpointSuffix); if (blobPathIsHadoop) { // Remove azure's hadoop prefix to match realtime ingestion path return blobPath.substring( - blobPath.indexOf(AZURE_STORAGE_HOST_ADDRESS) + AZURE_STORAGE_HOST_ADDRESS.length() + 1); + blobPath.indexOf(blobStorageEndpointSuffix) + blobStorageEndpointSuffix.length() + 1); } else { return blobPath; } diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureClientFactoryTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureClientFactoryTest.java index ffc4a8bb8013..bbf07b402ddc 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureClientFactoryTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureClientFactoryTest.java @@ -133,6 +133,7 @@ public void test_blobServiceClientBuilder_useAzureAccountConfig_asDefaultMaxTrie EasyMock.expect(config.getKey()).andReturn("key").times(2); EasyMock.expect(config.getAccount()).andReturn(ACCOUNT).times(2); EasyMock.expect(config.getMaxTries()).andReturn(3); + EasyMock.expect(config.getBlobStorageEndpoint()).andReturn(AzureUtils.AZURE_STORAGE_HOST_ADDRESS); azureClientFactory = new AzureClientFactory(config); EasyMock.replay(config); azureClientFactory.getBlobServiceClient(null); diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java index b3b67c4f13b4..ac851877c53b 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java @@ -60,13 +60,14 @@ public void test_getSegmentFiles_success() final File toDir = FileUtils.createTempDir(); try { final InputStream zipStream = new FileInputStream(pulledFile); + final AzureAccountConfig config = new AzureAccountConfig(); EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, BLOB_PATH)); EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andReturn(zipStream); replayAll(); - AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory); + AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory, config); FileUtils.FileCopyResult result = puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH, toDir); @@ -92,13 +93,14 @@ public void test_getSegmentFiles_blobPathIsHadoop_success() final File toDir = FileUtils.createTempDir(); try { final InputStream zipStream = new FileInputStream(pulledFile); + final AzureAccountConfig config = new AzureAccountConfig(); EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, BLOB_PATH)); EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andReturn(zipStream); replayAll(); - AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory); + AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory, config); FileUtils.FileCopyResult result = puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH_HADOOP, toDir); @@ -119,6 +121,7 @@ public void test_getSegmentFiles_blobPathIsHadoop_success() public void test_getSegmentFiles_nonRecoverableErrorRaisedWhenPullingSegmentFiles_doNotDeleteOutputDirectory() throws IOException, BlobStorageException, SegmentLoadingException { + final AzureAccountConfig config = new AzureAccountConfig(); final File outDir = FileUtils.createTempDir(); try { @@ -131,7 +134,7 @@ public void test_getSegmentFiles_nonRecoverableErrorRaisedWhenPullingSegmentFile replayAll(); - AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory); + AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory, config); puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH, outDir); } @@ -149,6 +152,7 @@ public void test_getSegmentFiles_nonRecoverableErrorRaisedWhenPullingSegmentFile public void test_getSegmentFiles_recoverableErrorRaisedWhenPullingSegmentFiles_deleteOutputDirectory() throws IOException, BlobStorageException, SegmentLoadingException { + final AzureAccountConfig config = new AzureAccountConfig(); final File outDir = FileUtils.createTempDir(); try { @@ -163,7 +167,7 @@ public void test_getSegmentFiles_recoverableErrorRaisedWhenPullingSegmentFiles_d EasyMock.replay(azureStorage); EasyMock.replay(byteSourceFactory); - AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory); + AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory, config); puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH, outDir); diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java index 6d5efbae501a..27d02cd23546 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java @@ -259,6 +259,26 @@ public void testAllCredentialsUnset() ); } + @Test + public void testGetBlobStorageEndpointWithDefaultProperties() + { + Properties properties = initializePropertes(); + AzureAccountConfig config = makeInjectorWithProperties(properties).getInstance(AzureAccountConfig.class); + Assert.assertEquals(config.getEndpointSuffix(), AzureUtils.DEFAULT_AZURE_ENDPOINT_SUFFIX); + Assert.assertEquals(config.getBlobStorageEndpoint(), AzureUtils.AZURE_STORAGE_HOST_ADDRESS); + } + + @Test + public void testGetBlobStorageEndpointWithCustomBlobPath() + { + Properties properties = initializePropertes(); + final String customSuffix = "core.usgovcloudapi.net"; + properties.setProperty("druid.azure.endpointSuffix", customSuffix); + AzureAccountConfig config = makeInjectorWithProperties(properties).getInstance(AzureAccountConfig.class); + Assert.assertEquals(config.getEndpointSuffix(), customSuffix); + Assert.assertEquals(config.getBlobStorageEndpoint(), "blob." + customSuffix); + } + private Injector makeInjectorWithProperties(final Properties props) { return Guice.createInjector( diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureUtilsTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureUtilsTest.java index 0222100c43ed..4a28c4de4ccc 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureUtilsTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureUtilsTest.java @@ -85,14 +85,14 @@ public void test_extractAzureKey_pathHasLeadingSlash_returnsPathWithLeadingSlash @Test public void test_maybeRemoveAzurePathPrefix_pathHasLeadingAzurePathPrefix_returnsPathWithLeadingAzurePathRemoved() { - String path = AzureUtils.maybeRemoveAzurePathPrefix(BLOB_PATH_WITH_LEADING_AZURE_PREFIX); + String path = AzureUtils.maybeRemoveAzurePathPrefix(BLOB_PATH_WITH_LEADING_AZURE_PREFIX, AzureUtils.AZURE_STORAGE_HOST_ADDRESS); Assert.assertEquals(BLOB_NAME, path); } @Test public void test_maybeRemoveAzurePathPrefix_pathDoesNotHaveAzurePathPrefix__returnsPathWithLeadingAzurePathRemoved() { - String path = AzureUtils.maybeRemoveAzurePathPrefix(BLOB_NAME); + String path = AzureUtils.maybeRemoveAzurePathPrefix(BLOB_NAME, AzureUtils.AZURE_STORAGE_HOST_ADDRESS); Assert.assertEquals(BLOB_NAME, path); } @@ -206,4 +206,18 @@ public void test_azureRetry_RunTimeExceptionWrappedInRunTimeException_returnsFal boolean retry = AzureUtils.AZURE_RETRY.apply(RUNTIME_EXCEPTION_WRAPPED_IN_RUNTIME_EXCEPTON); Assert.assertFalse(retry); } + + @Test + public void testRemoveAzurePathPrefixDefaultEndpoint() + { + String outputBlob = AzureUtils.maybeRemoveAzurePathPrefix("blob.core.windows.net/container/blob", "blob.core.windows.net"); + Assert.assertEquals("container/blob", outputBlob); + } + + @Test + public void testRemoveAzurePathPrefixCustomEndpoint() + { + String outputBlob = AzureUtils.maybeRemoveAzurePathPrefix("blob.core.usgovcloudapi.net/container/blob", "blob.core.usgovcloudapi.net"); + Assert.assertEquals("container/blob", outputBlob); + } }