Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement azure blob storage toolkit #53668

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions airbyte-cdk/bulk/toolkits/load-azure-blob-storage/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
dependencies {
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load')
implementation("com.azure:azure-storage-blob:12.29.0")
api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-aws')
api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-object-storage')

testImplementation("io.mockk:mockk:1.13.16")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.11.4")
Copy link
Contributor

@jdpgrailsdev jdpgrailsdev Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: consider making the version a constant just to make it easy to update. See

as an example.

testImplementation("org.junit.jupiter:junit-jupiter-params:5.11.4")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.11.4")

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.command.azureBlobStorage

import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.annotation.JsonPropertyDescription
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle

/**
* Mix-in to provide Azure Blob Storage configuration fields as properties.
*
* See [io.airbyte.cdk.load.command.DestinationConfiguration] for more details on how to use this
* interface.
*/
interface AzureBlobStorageSpecification {
@get:JsonSchemaTitle("Azure Blob Storage Account Name")
@get:JsonPropertyDescription(
"The name of the Azure Blob Storage Account. Read more <a href=\"https://learn.microsoft.com/en-gb/azure/storage/blobs/storage-blobs-introduction#storage-accounts\">here</a>."
)
@get:JsonProperty("azure_blob_storage_account_name")
@get:JsonSchemaInject(json = """{"examples":["mystorageaccount"]}""")
val azureBlobStorageAccountName: String

@get:JsonSchemaTitle("Azure Blob Storage Container Name")
@get:JsonPropertyDescription(
"The name of the Azure Blob Storage Container. Read more <a href=\"https://learn.microsoft.com/en-gb/azure/storage/blobs/storage-blobs-introduction#containers\">here</a>."
)
@get:JsonProperty("azure_blob_storage_container_name")
@get:JsonSchemaInject(json = """{"examples":["mycontainer"]}""")
val azureBlobStorageContainerName: String

@get:JsonSchemaTitle("Shared Access Signature")
@get:JsonPropertyDescription(
"A shared access signature (SAS) provides secure delegated access to resources in your storage account.. Read more <a href=\"https://learn.microsoft.com/en-gb/azure/storage/common/storage-sas-overview?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&bc=%2Fazure%2Fstorage%2Fblobs%2Fbreadcrumb%2Ftoc.json\">here</a>"
)
@get:JsonProperty("shared_access_signature")
@get:JsonSchemaInject(
json =
"""{"examples":["a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"],"airbyte_secret": true,"always_show": true}"""
)
val azureBlobStorageSharedAccessSignature: String

fun toAzureBlobStorageConfiguration(): AzureBlobStorageConfiguration {
return AzureBlobStorageConfiguration(
azureBlobStorageAccountName,
azureBlobStorageContainerName,
azureBlobStorageSharedAccessSignature
)
}
}

data class AzureBlobStorageConfiguration(
val accountName: String,
val containerName: String,
val sharedAccessSignature: String
)

interface AzureBlobStorageConfigurationProvider {
val azureBlobStorageConfiguration: AzureBlobStorageConfiguration
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.file.azureBlobStorage

import com.azure.core.util.BinaryData
import com.azure.storage.blob.BlobServiceClient
import com.azure.storage.blob.models.BlobStorageException
import com.azure.storage.blob.models.ListBlobsOptions
import io.airbyte.cdk.load.command.azureBlobStorage.AzureBlobStorageConfiguration
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
import io.airbyte.cdk.load.file.object_storage.RemoteObject
import io.airbyte.cdk.load.file.object_storage.StreamingUpload
import java.io.InputStream
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow

/** Represents a single blob in Azure. */
data class AzureBlob(
override val key: String,
override val storageConfig: AzureBlobStorageConfiguration
) : RemoteObject<AzureBlobStorageConfiguration>

class AzureBlobClient(
private val serviceClient: BlobServiceClient,
private val blobConfig: AzureBlobStorageConfiguration
) : ObjectStorageClient<AzureBlob> {

/** List all blobs that start with [prefix]. We emit them as a Flow. */
override suspend fun list(prefix: String): Flow<AzureBlob> = flow {
val containerClient = serviceClient.getBlobContainerClient(blobConfig.containerName)

containerClient
.listBlobs(ListBlobsOptions().setPrefix(prefix), null)
.map { it.name }
.filter { it.startsWith(prefix) }
.forEach { emit(AzureBlob(it, blobConfig)) }
}

/** Move is not a single operation in Azure; we have to do a copy + delete. */
override suspend fun move(remoteObject: AzureBlob, toKey: String): AzureBlob {
return move(remoteObject.key, toKey)
}

override suspend fun move(key: String, toKey: String): AzureBlob {
val sourceBlob =
serviceClient.getBlobContainerClient(blobConfig.containerName).getBlobClient(key)

val destBlob =
serviceClient.getBlobContainerClient(blobConfig.containerName).getBlobClient(toKey)

// Start copy
val copyResp = destBlob.beginCopy(sourceBlob.blobUrl, null)

copyResp.waitForCompletion()

// Delete source
sourceBlob.delete()
return AzureBlob(toKey, blobConfig)
}

/** Fetch the blob as an InputStream, pass it to [block]. */
override suspend fun <U> get(key: String, block: (InputStream) -> U): U {
val blobClient =
serviceClient.getBlobContainerClient(blobConfig.containerName).getBlobClient(key)

blobClient.openInputStream().use { inputStream ->
return block(inputStream)
}
}

/** Returns the user-defined metadata on the blob. */
override suspend fun getMetadata(key: String): Map<String, String> {
val blobClient =
serviceClient.getBlobContainerClient(blobConfig.containerName).getBlobClient(key)

val props = blobClient.properties
// The Azure SDK has "metadata" as a Map<String,String>.
// If the blob doesn't exist, this can throw.
return props?.metadata ?: emptyMap()
}

/** Upload a small byte array in a single shot. */
override suspend fun put(key: String, bytes: ByteArray): AzureBlob {
val blobClient =
serviceClient.getBlobContainerClient(blobConfig.containerName).getBlobClient(key)

blobClient.upload(BinaryData.fromBytes(bytes), true)
return AzureBlob(key, blobConfig)
}

/** Delete a blob by remoteObject */
override suspend fun delete(remoteObject: AzureBlob) {
delete(remoteObject.key)
}

/** Delete a blob by key */
override suspend fun delete(key: String) {
val blobClient =
serviceClient.getBlobContainerClient(blobConfig.containerName).getBlobClient(key)
try {
blobClient.delete()
} catch (e: BlobStorageException) {
if (e.statusCode == 404) {
// ignore not-found
} else {
throw e
}
}
}

override suspend fun startStreamingUpload(
key: String,
metadata: Map<String, String>
): StreamingUpload<AzureBlob> {
val blobClient =
serviceClient
.getBlobContainerClient(blobConfig.containerName)
.getBlobClient(key)
.getBlockBlobClient()

return AzureBlobStreamingUpload(blobClient, blobConfig, metadata)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.file.azureBlobStorage

import com.azure.storage.blob.BlobServiceClientBuilder
import io.airbyte.cdk.load.command.azureBlobStorage.AzureBlobStorageConfigurationProvider
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton

@Factory
class AzureBlobStorageClientFactory(
private val azureBlobStorageConfigurationProvider: AzureBlobStorageConfigurationProvider,
) {

@Singleton
@Secondary
fun make(): AzureBlobClient {
val endpoint =
"https://${azureBlobStorageConfigurationProvider.azureBlobStorageConfiguration.accountName}.blob.core.windows.net"

val azureServiceClient =
BlobServiceClientBuilder()
.endpoint(endpoint)
.sasToken(
azureBlobStorageConfigurationProvider.azureBlobStorageConfiguration
.sharedAccessSignature
)
.buildClient()

return AzureBlobClient(
azureServiceClient,
azureBlobStorageConfigurationProvider.azureBlobStorageConfiguration
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.file.azureBlobStorage

import com.azure.storage.blob.specialized.BlockBlobClient
import io.airbyte.cdk.load.command.azureBlobStorage.AzureBlobStorageConfiguration
import io.airbyte.cdk.load.file.object_storage.StreamingUpload
import io.airbyte.cdk.load.util.setOnce
import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.BufferedInputStream
import java.nio.ByteBuffer
import java.util.Base64
import java.util.concurrent.ConcurrentSkipListMap
import java.util.concurrent.atomic.AtomicBoolean

private const val BLOB_ID_PREFIX = "block"

class AzureBlobStreamingUpload(
private val blockBlobClient: BlockBlobClient,
private val config: AzureBlobStorageConfiguration,
private val metadata: Map<String, String>
) : StreamingUpload<AzureBlob> {

private val log = KotlinLogging.logger {}
private val isComplete = AtomicBoolean(false)
private val blockIds = ConcurrentSkipListMap<Int, String>()

/**
* Each part that arrives is treated as a new block. We must generate unique block IDs for each
* call (Azure requires base64-encoded strings).
*/
override suspend fun uploadPart(part: ByteArray, index: Int) {
// Generate a unique block id. We’ll just use index or a random
val rawBlockId = "block-$index-${System.nanoTime()}"
val blockId = generateBlockId(index)

log.info { "Staging block #$index => $rawBlockId (encoded = $blockId)" }

// The stageBlock call can be done asynchronously or blocking.
// Here we use the blocking call in a coroutine context.
BufferedInputStream(part.inputStream()).use {
blockBlobClient.stageBlock(
blockId,
it,
part.size.toLong(),
)
}

// Keep track of the blocks in the order they arrived (or the index).
blockIds[index] = blockId
}

/**
* After all parts are uploaded, we finalize by committing the block list in ascending order. If
* no parts were uploaded, we skip.
*/
override suspend fun complete(): AzureBlob {
if (isComplete.setOnce()) {
if (blockIds.isEmpty()) {
log.warn {
"No blocks uploaded. Committing empty blob: ${blockBlobClient.blobName}"
}
} else {
val blockList = blockIds.values.toList()
log.info { "Committing block list for ${blockBlobClient.blobName}: $blockList" }
blockBlobClient.commitBlockList(blockIds.values.toList(), true) // Overwrite = true
}

// Set any metadata
if (metadata.isNotEmpty()) {
blockBlobClient.setMetadata(metadata)
}
}

return AzureBlob(blockBlobClient.blobName, config)
}

fun generateBlockId(index: Int): String {
// Create a fixed-size ByteBuffer to store all components
val buffer = ByteBuffer.allocate(32) // Fixed size buffer

// Write prefix (padded to 10 bytes)
BLOB_ID_PREFIX.padEnd(10, ' ').forEach { buffer.put(it.code.toByte()) }

// Write integer (padded to 10 digits)
index.toString().padStart(10, '0').forEach { buffer.put(it.code.toByte()) }

// Generate random suffix (exactly 12 chars)
val suffixChars = ('A'..'Z') + ('0'..'9')
(1..12).forEach { _ -> buffer.put(suffixChars.random().code.toByte()) }

// Encode the entire fixed-length buffer to Base64
return Base64.getEncoder().encodeToString(buffer.array())
}
}
Loading
Loading