Skip to content

Commit

Permalink
Add ability to use Confluent Cloud Kafka Cluster in Azure (closes #79)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbeemster committed Oct 16, 2023
1 parent 0c6444f commit ae4d721
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 45 deletions.
18 changes: 13 additions & 5 deletions terraform/azure/pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ No providers.
| Name | Source | Version |
|------|--------|---------|
| <a name="module_bad_1_eh_topic"></a> [bad\_1\_eh\_topic](#module\_bad\_1\_eh\_topic) | snowplow-devops/event-hub/azurerm | 0.1.1 |
| <a name="module_collector_eh"></a> [collector\_eh](#module\_collector\_eh) | snowplow-devops/collector-event-hub-vmss/azurerm | 0.1.1 |
| <a name="module_collector_eh"></a> [collector\_eh](#module\_collector\_eh) | snowplow-devops/collector-event-hub-vmss/azurerm | 0.2.1 |
| <a name="module_collector_lb"></a> [collector\_lb](#module\_collector\_lb) | snowplow-devops/lb/azurerm | 0.2.0 |
| <a name="module_eh_namespace"></a> [eh\_namespace](#module\_eh\_namespace) | snowplow-devops/event-hub-namespace/azurerm | 0.1.1 |
| <a name="module_enrich_eh"></a> [enrich\_eh](#module\_enrich\_eh) | snowplow-devops/enrich-event-hub-vmss/azurerm | 0.1.2 |
| <a name="module_enrich_eh"></a> [enrich\_eh](#module\_enrich\_eh) | snowplow-devops/enrich-event-hub-vmss/azurerm | 0.2.1 |
| <a name="module_enriched_eh_topic"></a> [enriched\_eh\_topic](#module\_enriched\_eh\_topic) | snowplow-devops/event-hub/azurerm | 0.1.1 |
| <a name="module_lake_loader"></a> [lake\_loader](#module\_lake\_loader) | snowplow-devops/lake-loader-vmss/azurerm | 0.1.1 |
| <a name="module_lake_loader"></a> [lake\_loader](#module\_lake\_loader) | snowplow-devops/lake-loader-vmss/azurerm | 0.2.1 |
| <a name="module_lake_storage_container"></a> [lake\_storage\_container](#module\_lake\_storage\_container) | snowplow-devops/storage-container/azurerm | 0.1.1 |
| <a name="module_raw_eh_topic"></a> [raw\_eh\_topic](#module\_raw\_eh\_topic) | snowplow-devops/event-hub/azurerm | 0.1.1 |
| <a name="module_sf_loader"></a> [sf\_loader](#module\_sf\_loader) | snowplow-devops/snowflake-loader-vmss/azurerm | 0.1.1 |
| <a name="module_sf_loader"></a> [sf\_loader](#module\_sf\_loader) | snowplow-devops/snowflake-loader-vmss/azurerm | 0.2.1 |
| <a name="module_sf_message_queue_eh_topic"></a> [sf\_message\_queue\_eh\_topic](#module\_sf\_message\_queue\_eh\_topic) | snowplow-devops/event-hub/azurerm | 0.1.1 |
| <a name="module_sf_transformer_storage_container"></a> [sf\_transformer\_storage\_container](#module\_sf\_transformer\_storage\_container) | snowplow-devops/storage-container/azurerm | 0.1.1 |
| <a name="module_sf_transformer_wrj"></a> [sf\_transformer\_wrj](#module\_sf\_transformer\_wrj) | snowplow-devops/transformer-event-hub-vmss/azurerm | 0.1.1 |
| <a name="module_sf_transformer_wrj"></a> [sf\_transformer\_wrj](#module\_sf\_transformer\_wrj) | snowplow-devops/transformer-event-hub-vmss/azurerm | 0.2.1 |
| <a name="module_storage_account"></a> [storage\_account](#module\_storage\_account) | snowplow-devops/storage-account/azurerm | 0.1.2 |

## Resources
Expand All @@ -45,6 +45,13 @@ No resources.
| <a name="input_storage_account_name"></a> [storage\_account\_name](#input\_storage\_account\_name) | The name of the Storage Account the data will be loaded into | `string` | n/a | yes |
| <a name="input_subnet_id_lb"></a> [subnet\_id\_lb](#input\_subnet\_id\_lb) | The ID of the subnet to deploy the load balancer into (e.g. collector-agw1) | `string` | n/a | yes |
| <a name="input_subnet_id_servers"></a> [subnet\_id\_servers](#input\_subnet\_id\_servers) | The ID of the subnet to deploy the servers into (e.g. pipeline1) | `string` | n/a | yes |
| <a name="input_confluent_cloud_api_key"></a> [confluent\_cloud\_api\_key](#input\_confluent\_cloud\_api\_key) | Confluent Cloud API Key | `string` | `""` | no |
| <a name="input_confluent_cloud_api_secret"></a> [confluent\_cloud\_api\_secret](#input\_confluent\_cloud\_api\_secret) | Confluent Cloud API Secret | `string` | `""` | no |
| <a name="input_confluent_cloud_bad_1_topic_name"></a> [confluent\_cloud\_bad\_1\_topic\_name](#input\_confluent\_cloud\_bad\_1\_topic\_name) | Confluent Cloud 'bad-1' topic name | `string` | `"bad-1"` | no |
| <a name="input_confluent_cloud_bootstrap_server"></a> [confluent\_cloud\_bootstrap\_server](#input\_confluent\_cloud\_bootstrap\_server) | Confluent Cloud cluster bootstrap server | `string` | `""` | no |
| <a name="input_confluent_cloud_enriched_topic_name"></a> [confluent\_cloud\_enriched\_topic\_name](#input\_confluent\_cloud\_enriched\_topic\_name) | Confluent Cloud 'enriched' topic name | `string` | `"enriched"` | no |
| <a name="input_confluent_cloud_raw_topic_name"></a> [confluent\_cloud\_raw\_topic\_name](#input\_confluent\_cloud\_raw\_topic\_name) | Confluent Cloud 'raw' topic name | `string` | `"raw"` | no |
| <a name="input_confluent_cloud_snowflake_loader_topic_name"></a> [confluent\_cloud\_snowflake\_loader\_topic\_name](#input\_confluent\_cloud\_snowflake\_loader\_topic\_name) | Confluent Cloud 'snowflake-loader' topic name | `string` | `"snowflake-loader"` | no |
| <a name="input_lake_enabled"></a> [lake\_enabled](#input\_lake\_enabled) | Whether to load all data into a Storage Container to build a data-lake based on Delta format | `bool` | `false` | no |
| <a name="input_snowflake_account"></a> [snowflake\_account](#input\_snowflake\_account) | Snowflake account to use | `string` | `""` | no |
| <a name="input_snowflake_database"></a> [snowflake\_database](#input\_snowflake\_database) | Snowflake database name | `string` | `""` | no |
Expand All @@ -57,6 +64,7 @@ No resources.
| <a name="input_snowflake_warehouse"></a> [snowflake\_warehouse](#input\_snowflake\_warehouse) | Snowflake warehouse name | `string` | `""` | no |
| <a name="input_ssl_information"></a> [ssl\_information](#input\_ssl\_information) | SSL certificate information to optionally bind to the load balancer | <pre>object({<br> enabled = bool<br> data = string<br> password = string<br> })</pre> | <pre>{<br> "data": "",<br> "enabled": false,<br> "password": ""<br>}</pre> | no |
| <a name="input_storage_account_deploy"></a> [storage\_account\_deploy](#input\_storage\_account\_deploy) | Whether this module should create a new storage account with the specified name - if the account already exists set this to false | `bool` | `true` | no |
| <a name="input_stream_type"></a> [stream\_type](#input\_stream\_type) | The stream type to use as the Kafka Cluster between components (options: azure\_event\_hubs, confluent\_cloud) | `string` | `"azure_event_hubs"` | no |
| <a name="input_tags"></a> [tags](#input\_tags) | The tags to append to the resources in this module | `map(string)` | `{}` | no |
| <a name="input_telemetry_enabled"></a> [telemetry\_enabled](#input\_telemetry\_enabled) | Whether or not to send telemetry information back to Snowplow Analytics Ltd | `bool` | `true` | no |
| <a name="input_user_provided_id"></a> [user\_provided\_id](#input\_user\_provided\_id) | An optional unique identifier to identify the telemetry events emitted by this stack | `string` | `""` | no |
Expand Down
73 changes: 54 additions & 19 deletions terraform/azure/pipeline/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,21 @@ module "storage_account" {

locals {
storage_account_name = var.storage_account_deploy ? join("", module.storage_account.*.name) : var.storage_account_name

# Note: as the options are only EventHubs or Confluent Cloud we want to default to EventHubs
# unless Confluent Cloud is *explictly selected*.
#
# This logic will need to change should we support multiple streaming options.
use_azure_event_hubs = var.stream_type != "confluent_cloud"
}

# 1. Deploy EventHubs topics
module "eh_namespace" {
source = "snowplow-devops/event-hub-namespace/azurerm"
version = "0.1.1"

count = local.use_azure_event_hubs ? 1 : 0

name = "${var.prefix}-namespace"
resource_group_name = var.resource_group_name

Expand All @@ -41,30 +49,49 @@ module "raw_eh_topic" {
source = "snowplow-devops/event-hub/azurerm"
version = "0.1.1"

count = local.use_azure_event_hubs ? 1 : 0

name = "raw-topic"
namespace_name = module.eh_namespace.name
namespace_name = join("", module.eh_namespace.*.name)
resource_group_name = var.resource_group_name
}

module "bad_1_eh_topic" {
source = "snowplow-devops/event-hub/azurerm"
version = "0.1.1"

count = local.use_azure_event_hubs ? 1 : 0

name = "bad-1-topic"
namespace_name = module.eh_namespace.name
namespace_name = join("", module.eh_namespace.*.name)
resource_group_name = var.resource_group_name
}

module "enriched_eh_topic" {
source = "snowplow-devops/event-hub/azurerm"
version = "0.1.1"

count = local.use_azure_event_hubs ? 1 : 0

name = "enriched-topic"
namespace_name = module.eh_namespace.name
namespace_name = join("", module.eh_namespace.*.name)
resource_group_name = var.resource_group_name
}

# 2. Deploy Collector stack
# 2. Figure out which Kafka Cluster to use

locals {
kafka_brokers = local.use_azure_event_hubs ? join("", module.eh_namespace.*.broker) : var.confluent_cloud_bootstrap_server
kafka_username = local.use_azure_event_hubs ? "$ConnectionString" : var.confluent_cloud_api_key

eh_namespace_name = local.use_azure_event_hubs ? join("", module.eh_namespace.*.name) : ""

raw_topic_name = local.use_azure_event_hubs ? join("", module.raw_eh_topic.*.name) : var.confluent_cloud_raw_topic_name
bad_1_topic_name = local.use_azure_event_hubs ? join("", module.bad_1_eh_topic.*.name) : var.confluent_cloud_bad_1_topic_name
enriched_topic_name = local.use_azure_event_hubs ? join("", module.enriched_eh_topic.*.name) : var.confluent_cloud_enriched_topic_name
}

# 3. Deploy Collector stack
module "collector_lb" {
source = "snowplow-devops/lb/azurerm"
version = "0.2.0"
Expand All @@ -84,7 +111,7 @@ module "collector_lb" {

module "collector_eh" {
source = "snowplow-devops/collector-event-hub-vmss/azurerm"
version = "0.1.1"
version = "0.2.1"

name = "${var.prefix}-collector"
resource_group_name = var.resource_group_name
Expand All @@ -97,21 +124,24 @@ module "collector_eh" {
ssh_public_key = var.ssh_public_key
ssh_ip_allowlist = var.ssh_ip_allowlist

good_topic_name = module.raw_eh_topic.name
bad_topic_name = module.bad_1_eh_topic.name
eh_namespace_broker = module.eh_namespace.broker
eh_namespace_read_write_connection_string = module.eh_namespace.read_write_primary_connection_string
good_topic_name = local.raw_topic_name
bad_topic_name = local.bad_1_topic_name
kafka_brokers = local.kafka_brokers
kafka_username = local.kafka_username
kafka_password = local.use_azure_event_hubs ? join("", module.eh_namespace.*.read_write_primary_connection_string) : var.confluent_cloud_api_secret

kafka_source = var.stream_type

telemetry_enabled = var.telemetry_enabled
user_provided_id = var.user_provided_id

tags = var.tags
}

# 3. Deploy Enrich stack
# 4. Deploy Enrich stack
module "enrich_eh" {
source = "snowplow-devops/enrich-event-hub-vmss/azurerm"
version = "0.1.2"
version = "0.2.1"

name = "${var.prefix}-enrich"
resource_group_name = var.resource_group_name
Expand All @@ -120,14 +150,19 @@ module "enrich_eh" {
ssh_public_key = var.ssh_public_key
ssh_ip_allowlist = var.ssh_ip_allowlist

raw_topic_name = module.raw_eh_topic.name
raw_topic_connection_string = module.raw_eh_topic.read_only_primary_connection_string
good_topic_name = module.enriched_eh_topic.name
good_topic_connection_string = module.enriched_eh_topic.read_write_primary_connection_string
bad_topic_name = module.bad_1_eh_topic.name
bad_topic_connection_string = module.bad_1_eh_topic.read_write_primary_connection_string
eh_namespace_name = module.eh_namespace.name
eh_namespace_broker = module.eh_namespace.broker
raw_topic_name = local.raw_topic_name
raw_topic_kafka_username = local.kafka_username
raw_topic_kafka_password = local.use_azure_event_hubs ? join("", module.raw_eh_topic.*.read_only_primary_connection_string) : var.confluent_cloud_api_secret
good_topic_name = local.enriched_topic_name
good_topic_kafka_username = local.kafka_username
good_topic_kafka_password = local.use_azure_event_hubs ? join("", module.enriched_eh_topic.*.read_write_primary_connection_string) : var.confluent_cloud_api_secret
bad_topic_name = local.bad_1_topic_name
bad_topic_kafka_username = local.kafka_username
bad_topic_kafka_password = local.use_azure_event_hubs ? join("", module.bad_1_eh_topic.*.read_write_primary_connection_string) : var.confluent_cloud_api_secret
eh_namespace_name = local.eh_namespace_name
kafka_brokers = local.kafka_brokers

kafka_source = var.stream_type

telemetry_enabled = var.telemetry_enabled
user_provided_id = var.user_provided_id
Expand Down
18 changes: 11 additions & 7 deletions terraform/azure/pipeline/target_lake_loader.tf
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,24 @@ module "lake_storage_container" {

module "lake_loader" {
source = "snowplow-devops/lake-loader-vmss/azurerm"
version = "0.1.1"
version = "0.2.1"

count = var.lake_enabled ? 1 : 0

name = "${var.prefix}-lake-loader"
resource_group_name = var.resource_group_name
subnet_id = var.subnet_id_servers

enriched_topic_name = module.enriched_eh_topic.name
enriched_topic_connection_string = module.enriched_eh_topic.read_only_primary_connection_string
bad_topic_name = module.bad_1_eh_topic.name
bad_topic_connection_string = module.bad_1_eh_topic.read_write_primary_connection_string
eh_namespace_name = module.eh_namespace.name
eh_namespace_broker = module.eh_namespace.broker
enriched_topic_name = local.enriched_topic_name
enriched_topic_kafka_username = local.kafka_username
enriched_topic_kafka_password = local.use_azure_event_hubs ? join("", module.enriched_eh_topic.*.read_only_primary_connection_string) : var.confluent_cloud_api_secret
bad_topic_name = local.bad_1_topic_name
bad_topic_kafka_username = local.kafka_username
bad_topic_kafka_password = local.use_azure_event_hubs ? join("", module.bad_1_eh_topic.*.read_write_primary_connection_string) : var.confluent_cloud_api_secret
eh_namespace_name = local.eh_namespace_name
kafka_brokers = local.kafka_brokers

kafka_source = var.stream_type

storage_account_name = local.storage_account_name
storage_container_name = module.lake_storage_container[0].name
Expand Down
39 changes: 25 additions & 14 deletions terraform/azure/pipeline/target_snowflake.tf
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ module "sf_message_queue_eh_topic" {
source = "snowplow-devops/event-hub/azurerm"
version = "0.1.1"

count = var.snowflake_enabled ? 1 : 0
count = local.use_azure_event_hubs && var.snowflake_enabled ? 1 : 0

name = "snowflake-loader-topic"
namespace_name = module.eh_namespace.name
namespace_name = join("", module.eh_namespace.*.name)
resource_group_name = var.resource_group_name
}

locals {
snowflake_loader_topic_name = local.use_azure_event_hubs ? join("", module.sf_message_queue_eh_topic.*.name) : var.confluent_cloud_snowflake_loader_topic_name
}

module "sf_transformer_storage_container" {
source = "snowplow-devops/storage-container/azurerm"
version = "0.1.1"
Expand All @@ -21,20 +25,24 @@ module "sf_transformer_storage_container" {

module "sf_transformer_wrj" {
source = "snowplow-devops/transformer-event-hub-vmss/azurerm"
version = "0.1.1"
version = "0.2.1"

count = var.snowflake_enabled ? 1 : 0

name = "${var.prefix}-snowflake-transformer"
resource_group_name = var.resource_group_name
subnet_id = var.subnet_id_servers

enriched_topic_name = module.enriched_eh_topic.name
enriched_topic_connection_string = module.enriched_eh_topic.read_only_primary_connection_string
queue_topic_name = module.sf_message_queue_eh_topic[0].name
queue_topic_connection_string = module.sf_message_queue_eh_topic[0].read_write_primary_connection_string
eh_namespace_name = module.eh_namespace.name
eh_namespace_broker = module.eh_namespace.broker
enriched_topic_name = local.enriched_topic_name
enriched_topic_kafka_username = local.kafka_username
enriched_topic_kafka_password = local.use_azure_event_hubs ? join("", module.enriched_eh_topic.*.read_only_primary_connection_string) : var.confluent_cloud_api_secret
queue_topic_name = local.snowflake_loader_topic_name
queue_topic_kafka_username = local.kafka_username
queue_topic_kafka_password = local.use_azure_event_hubs ? join("", module.sf_message_queue_eh_topic.*.read_write_primary_connection_string) : var.confluent_cloud_api_secret
eh_namespace_name = local.eh_namespace_name
kafka_brokers = local.kafka_brokers

kafka_source = var.stream_type

storage_account_name = local.storage_account_name
storage_container_name = module.sf_transformer_storage_container[0].name
Expand All @@ -57,18 +65,21 @@ module "sf_transformer_wrj" {

module "sf_loader" {
source = "snowplow-devops/snowflake-loader-vmss/azurerm"
version = "0.1.1"
version = "0.2.1"

count = var.snowflake_enabled ? 1 : 0

name = "${var.prefix}-snowflake-loader"
resource_group_name = var.resource_group_name
subnet_id = var.subnet_id_servers

queue_topic_name = module.sf_message_queue_eh_topic[0].name
queue_topic_connection_string = module.sf_message_queue_eh_topic[0].read_only_primary_connection_string
eh_namespace_name = module.eh_namespace.name
eh_namespace_broker = module.eh_namespace.broker
queue_topic_name = local.snowflake_loader_topic_name
queue_topic_kafka_username = local.kafka_username
queue_topic_kafka_password = local.use_azure_event_hubs ? join("", module.sf_message_queue_eh_topic.*.read_only_primary_connection_string) : var.confluent_cloud_api_secret
eh_namespace_name = local.eh_namespace_name
kafka_brokers = local.kafka_brokers

kafka_source = var.stream_type

storage_account_name = local.storage_account_name
storage_container_name_for_transformer_output = module.sf_transformer_storage_container[0].name
Expand Down
24 changes: 24 additions & 0 deletions terraform/azure/pipeline/terraform.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,30 @@ ssl_information = {
enabled = false
}

# --- Stream Selection

# The stream type to use between deployed components:
#
# 1. azure_event_hubs: If selected will deploy a namespace and topics into the same resource
# group as all other assets (no extra steps required)
# 2. confluent_cloud: If selected you will need to manually deploy a Cluster and associated
# topics for the applications to stream data into
stream_type = "azure_event_hubs"

# --- Stream: Confluent Cloud
# API Key details for your deployed cluster
confluent_cloud_api_key = ""
confluent_cloud_api_secret = ""

# Bootstrap server for your deployed cluster
confluent_cloud_bootstrap_server = ""

# Names of the created topics within the deployed cluster
confluent_cloud_raw_topic_name = "raw"
confluent_cloud_enriched_topic_name = "enriched"
confluent_cloud_bad_1_topic_name = "bad-1"
confluent_cloud_snowflake_loader_topic_name = "snowflake-loader"

# --- Target: Snowflake
# Follow the guide to get input values for the loader:
# https://docs.snowplow.io/docs/getting-started-on-snowplow-open-source/quick-start/
Expand Down
Loading

0 comments on commit ae4d721

Please sign in to comment.