From 289e5e299077fc77301e705bce945f77eb3c096a Mon Sep 17 00:00:00 2001 From: Ivan Savciuc Date: Wed, 8 Jan 2020 10:54:51 +0100 Subject: [PATCH 1/2] Add Kafka connectors support --- aiven/datasource_kafka_connector.go | 34 ++++ aiven/provider.go | 2 + aiven/resource_kafka_connector.go | 243 +++++++++++++++++++++++++ examples/kafka_connectors/connector.tf | 14 ++ examples/kafka_connectors/project.tf | 5 + examples/kafka_connectors/provider.tf | 4 + examples/kafka_connectors/services.tf | 44 +++++ examples/kafka_connectors/variables.tf | 2 + 8 files changed, 348 insertions(+) create mode 100644 aiven/datasource_kafka_connector.go create mode 100644 aiven/resource_kafka_connector.go create mode 100644 examples/kafka_connectors/connector.tf create mode 100644 examples/kafka_connectors/project.tf create mode 100644 examples/kafka_connectors/provider.tf create mode 100644 examples/kafka_connectors/services.tf create mode 100644 examples/kafka_connectors/variables.tf diff --git a/aiven/datasource_kafka_connector.go b/aiven/datasource_kafka_connector.go new file mode 100644 index 000000000..b741bb4eb --- /dev/null +++ b/aiven/datasource_kafka_connector.go @@ -0,0 +1,34 @@ +package aiven + +import ( + "fmt" + "github.com/aiven/aiven-go-client" + "github.com/hashicorp/terraform/helper/schema" +) + +func datasourceKafkaConnector() *schema.Resource { + return &schema.Resource{ + Read: datasourceKafkaConnectorRead, + Schema: resourceSchemaAsDatasourceSchema(aivenKafkaConnectorSchema, "project", "service_name", "connector_name"), + } +} + +func datasourceKafkaConnectorRead(d *schema.ResourceData, m interface{}) error { + projectName := d.Get("project").(string) + serviceName := d.Get("service_name").(string) + connectorName := d.Get("connector_name").(string) + + cons, err := m.(*aiven.Client).KafkaConnectors.List(projectName, serviceName) + if err != nil { + return err + } + + for _, con := range cons.Connectors { + if con.Name == connectorName { + d.SetId(buildResourceID(projectName, serviceName, connectorName)) + return resourceKafkaConnectorRead(d, m) + } + } + + return fmt.Errorf("kafka connector %s/%s not found", connectorName) +} diff --git a/aiven/provider.go b/aiven/provider.go index f1895830c..6d834c77a 100644 --- a/aiven/provider.go +++ b/aiven/provider.go @@ -28,6 +28,7 @@ func Provider() terraform.ResourceProvider { "aiven_database": datasourceDatabase(), "aiven_kafka_acl": datasourceKafkaACL(), "aiven_kafka_topic": datasourceKafkaTopic(), + "aiven_kafka_connector": datasourceKafkaConnector(), "aiven_kafka_schema": datasourceKafkaSchema(), "aiven_kafka_schema_configuration": datasourceKafkaSchemaConfiguration(), "aiven_elasticsearch_acl": datasourceElasticsearchACL(), @@ -45,6 +46,7 @@ func Provider() terraform.ResourceProvider { "aiven_database": resourceDatabase(), "aiven_kafka_acl": resourceKafkaACL(), "aiven_kafka_topic": resourceKafkaTopic(), + "aiven_kafka_connector": resourceKafkaConnector(), "aiven_kafka_schema": resourceKafkaSchema(), "aiven_kafka_schema_configuration": resourceKafkaSchemaConfiguration(), "aiven_elasticsearch_acl": resourceElasticsearchACL(), diff --git a/aiven/resource_kafka_connector.go b/aiven/resource_kafka_connector.go new file mode 100644 index 000000000..c03467771 --- /dev/null +++ b/aiven/resource_kafka_connector.go @@ -0,0 +1,243 @@ +package aiven + +import ( + "fmt" + "github.com/aiven/aiven-go-client" + "github.com/hashicorp/terraform/helper/schema" +) + +var aivenKafkaConnectorSchema = map[string]*schema.Schema{ + "project": { + Type: schema.TypeString, + Description: "Project to link the kafka connector to", + Required: true, + ForceNew: true, + }, + "service_name": { + Type: schema.TypeString, + Description: "Service to link the kafka connector to", + Required: true, + ForceNew: true, + }, + "connector_name": { + Type: schema.TypeString, + Description: "Kafka connector name", + Required: true, + ForceNew: true, + }, + "config": { + Type: schema.TypeMap, + Description: "Kafka Connector configuration parameters", + Required: true, + Elem: &schema.Schema{ + Type: schema.TypeString, + }, + }, + "plugin_author": { + Type: schema.TypeString, + Description: "Kafka connector author", + Computed: true, + }, + "plugin_class": { + Type: schema.TypeString, + Description: "Kafka connector Java class", + Computed: true, + }, + "plugin_doc_url": { + Type: schema.TypeString, + Description: "Kafka connector documentation URL", + Computed: true, + }, + "plugin_title": { + Type: schema.TypeString, + Description: "Kafka connector title", + Computed: true, + }, + "plugin_type": { + Type: schema.TypeString, + Description: "Kafka connector type", + Computed: true, + }, + "plugin_version": { + Type: schema.TypeString, + Description: "Kafka connector version", + Computed: true, + }, + "task": { + Type: schema.TypeSet, + Description: "List of tasks of a connector", + Computed: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "connector": { + Type: schema.TypeString, + Description: "Related connector name", + Computed: true, + }, + "task": { + Type: schema.TypeInt, + Description: "Task id / number", + Computed: true, + }, + }, + }, + }, +} + +func resourceKafkaConnector() *schema.Resource { + return &schema.Resource{ + Create: resourceKafkaConnectorCreate, + Read: resourceKafkaConnectorRead, + Update: resourceKafkaTConnectorUpdate, + Delete: resourceKafkaConnectorDelete, + Exists: resourceKafkaConnectorExists, + Importer: &schema.ResourceImporter{ + State: resourceKafkaConnectorState, + }, + + Schema: aivenKafkaConnectorSchema, + } +} + +func flattenKafkaConnectorTasks(r *aiven.KafkaConnector) []map[string]interface{} { + var tasks []map[string]interface{} + + for _, taskS := range r.Tasks { + task := map[string]interface{}{ + "connector": taskS.Connector, + "task": taskS.Task, + } + + tasks = append(tasks, task) + } + + return tasks +} + +func resourceKafkaConnectorRead(d *schema.ResourceData, m interface{}) error { + project, serviceName, connectorName := splitResourceID3(d.Id()) + + res, err := m.(*aiven.Client).KafkaConnectors.List(project, serviceName) + if err != nil { + return fmt.Errorf("cannot read Kafka Connector List resource %s: %s", d.Id(), err) + } + + var found bool + for _, r := range res.Connectors { + if r.Name == connectorName { + found = true + if err := d.Set("project", project); err != nil { + return fmt.Errorf("error setting Kafka Connector `project` for resource %s: %s", d.Id(), err) + } + if err := d.Set("service_name", serviceName); err != nil { + return fmt.Errorf("error setting Kafka Connector `service_name` for resource %s: %s", d.Id(), err) + } + if err := d.Set("connector_name", connectorName); err != nil { + return fmt.Errorf("error setting Kafka Connector `connector_name` for resource %s: %s", d.Id(), err) + } + if err := d.Set("config", r.Config); err != nil { + return fmt.Errorf("error setting Kafka Connector `config` for resource %s: %s", d.Id(), err) + } + if err := d.Set("plugin_author", r.Plugin.Author); err != nil { + return fmt.Errorf("error setting Kafka Connector `plugin_author` for resource %s: %s", d.Id(), err) + } + if err := d.Set("plugin_class", r.Plugin.Class); err != nil { + return fmt.Errorf("error setting Kafka Connector `plugin_class` for resource %s: %s", d.Id(), err) + } + if err := d.Set("plugin_doc_url", r.Plugin.DocumentationURL); err != nil { + return fmt.Errorf("error setting Kafka Connector `plugin_doc_url` for resource %s: %s", d.Id(), err) + } + if err := d.Set("plugin_title", r.Plugin.Title); err != nil { + return fmt.Errorf("error setting Kafka Connector `plugin_title` for resource %s: %s", d.Id(), err) + } + if err := d.Set("plugin_type", r.Plugin.Type); err != nil { + return fmt.Errorf("error setting Kafka Connector `plugin_type` for resource %s: %s", d.Id(), err) + } + if err := d.Set("plugin_version", r.Plugin.Version); err != nil { + return fmt.Errorf("error setting Kafka Connector `plugin_version` for resource %s: %s", d.Id(), err) + } + + tasks := flattenKafkaConnectorTasks(&r) + if err := d.Set("task", tasks); err != nil { + return fmt.Errorf("error setting Kafka Connector `task` array for resource %s: %s", d.Id(), err) + } + } + } + + if !found { + return fmt.Errorf("cannot read Kafka Connector resource with Id: %s not found in a Kafka Connectors list", d.Id()) + } + + return nil +} + +func resourceKafkaConnectorCreate(d *schema.ResourceData, m interface{}) error { + project := d.Get("project").(string) + serviceName := d.Get("service_name").(string) + connectorName := d.Get("connector_name").(string) + + config := make(aiven.KafkaConnectorConfig) + for k, cS := range d.Get("config").(map[string]interface{}) { + config[k] = cS.(string) + } + + err := m.(*aiven.Client).KafkaConnectors.Create(project, serviceName, config) + if err != nil { + return err + } + + d.SetId(buildResourceID(project, serviceName, connectorName)) + + return resourceKafkaConnectorRead(d, m) +} + +func resourceKafkaConnectorDelete(d *schema.ResourceData, m interface{}) error { + return m.(*aiven.Client).KafkaConnectors.Delete( + splitResourceID3(d.Id())) +} + +func resourceKafkaTConnectorUpdate(d *schema.ResourceData, m interface{}) error { + project, serviceName, connectorName := splitResourceID3(d.Id()) + + config := make(aiven.KafkaConnectorConfig) + for k, cS := range d.Get("config").(map[string]interface{}) { + config[k] = cS.(string) + } + + _, err := m.(*aiven.Client).KafkaConnectors.Update(project, serviceName, connectorName, config) + if err != nil { + return err + } + + return resourceKafkaConnectorRead(d, m) +} + +func resourceKafkaConnectorExists(d *schema.ResourceData, m interface{}) (bool, error) { + project, serviceName, connectorName := splitResourceID3(d.Id()) + + r, err := m.(*aiven.Client).KafkaConnectors.List(project, serviceName) + if err != nil { + return false, err + } + + if ok, err := resourceExists(err); err != nil { + return ok, err + } + + for _, c := range r.Connectors { + if c.Name == connectorName { + return true, nil + } + } + + return false, nil +} + +func resourceKafkaConnectorState(d *schema.ResourceData, m interface{}) ([]*schema.ResourceData, error) { + err := resourceKafkaConnectorRead(d, m) + if err != nil { + return nil, err + } + + return []*schema.ResourceData{d}, nil +} diff --git a/examples/kafka_connectors/connector.tf b/examples/kafka_connectors/connector.tf new file mode 100644 index 000000000..4c493bd22 --- /dev/null +++ b/examples/kafka_connectors/connector.tf @@ -0,0 +1,14 @@ +# Kafka connector +resource "aiven_kafka_connector" "kafka-es-con1" { + project = aiven_project.kafka-con-project1.project + service_name = aiven_service.kafka-service1.service_name + connector_name = "kafka-es-con1" + + config = { + "topics" = aiven_kafka_topic.kafka-topic1.topic_name + "connector.class" : "io.aiven.connect.elasticsearch.ElasticsearchSinkConnector" + "type.name" = "es-connector" + "name" = "kafka-es-con1" + "connection.url" = aiven_service.es-service1.service_uri + } +} \ No newline at end of file diff --git a/examples/kafka_connectors/project.tf b/examples/kafka_connectors/project.tf new file mode 100644 index 000000000..d3bb93540 --- /dev/null +++ b/examples/kafka_connectors/project.tf @@ -0,0 +1,5 @@ +# Project +resource "aiven_project" "kafka-con-project1" { + project = "kafka-con-project1" + card_id = var.aiven_card_id +} \ No newline at end of file diff --git a/examples/kafka_connectors/provider.tf b/examples/kafka_connectors/provider.tf new file mode 100644 index 000000000..695517f85 --- /dev/null +++ b/examples/kafka_connectors/provider.tf @@ -0,0 +1,4 @@ +# Initialize provider. No other config options than api_token +provider "aiven" { + api_token = var.aiven_api_token +} diff --git a/examples/kafka_connectors/services.tf b/examples/kafka_connectors/services.tf new file mode 100644 index 000000000..7da26d746 --- /dev/null +++ b/examples/kafka_connectors/services.tf @@ -0,0 +1,44 @@ +# Kafka service +resource "aiven_service" "kafka-service1" { + project = aiven_project.kafka-con-project1.project + cloud_name = "google-europe-west1" + plan = "business-4" + service_name = "kafka-service1" + service_type = "kafka" + maintenance_window_dow = "monday" + maintenance_window_time = "10:00:00" + + kafka_user_config { + // Enables Kafka Connectors + kafka_connect = true + kafka_version = "2.4" + kafka { + group_max_session_timeout_ms = 70000 + log_retention_bytes = 1000000000 + } + } +} + +# Kafka topic +resource "aiven_kafka_topic" "kafka-topic1" { + project = aiven_project.kafka-con-project1.project + service_name = aiven_service.kafka-service1.service_name + topic_name = "test-kafka-topic1" + partitions = 3 + replication = 2 +} + +# Elasticsearch service +resource "aiven_service" "es-service1" { + project = aiven_project.kafka-con-project1.project + cloud_name = "google-europe-west1" + plan = "hobbyist" + service_name = "es-service1" + service_type = "elasticsearch" + maintenance_window_dow = "monday" + maintenance_window_time = "10:00:00" + + elasticsearch_user_config { + elasticsearch_version = "7" + } +} \ No newline at end of file diff --git a/examples/kafka_connectors/variables.tf b/examples/kafka_connectors/variables.tf new file mode 100644 index 000000000..4451e9657 --- /dev/null +++ b/examples/kafka_connectors/variables.tf @@ -0,0 +1,2 @@ +variable "aiven_api_token" {} +variable "aiven_card_id" {} \ No newline at end of file From e832c8397f76592f9bb8d243d51ed9436d05e7ad Mon Sep 17 00:00:00 2001 From: Ivan Savciuc Date: Thu, 9 Jan 2020 14:07:52 +0100 Subject: [PATCH 2/2] go modules: use latest the version of go-client v1.4.2 --- README.md | 25 +++++++++++++++++++++++++ aiven/datasource_kafka_connector.go | 2 +- go.sum | 2 ++ 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 102b061f8..14a134e38 100644 --- a/README.md +++ b/README.md @@ -435,6 +435,31 @@ Allowed values: "BACKWARD", "BACKWARD_TRANSITIVE", "FORWARD", "FORWARD_TRANSITIV `schema` is Kafka Schema configuration should be a valid Avro Schema JSON format +### Resource Kafka connectors +``` +resource "aiven_kafka_connector" "kafka-es-con1" { + project = aiven_project.kafka-con-project1.project + service_name = aiven_service.kafka-service1.service_name + connector_name = "kafka-es-con1" + + config = { + "topics" = aiven_kafka_topic.kafka-topic1.topic_name + "connector.class" : "io.aiven.connect.elasticsearch.ElasticsearchSinkConnector" + "type.name" = "es-connector" + "name" = "kafka-es-con1" + "connection.url" = aiven_service.es-service1.service_uri + } +} +``` + +`project` and `service_name` define the project and service the Kafka Connectors belongs to. +They should be defined using reference as shown above to set up dependencies correctly. + +`connector_name` is the Kafka connector name + +`config` is the Kafka Connector configuration parameters, where `topics`, `connector.class` and `name` +are required parameters but the rest of them are connector type specific. + ### Resource Elasticsearch ACL ``` diff --git a/aiven/datasource_kafka_connector.go b/aiven/datasource_kafka_connector.go index b741bb4eb..ce9bf7ffe 100644 --- a/aiven/datasource_kafka_connector.go +++ b/aiven/datasource_kafka_connector.go @@ -30,5 +30,5 @@ func datasourceKafkaConnectorRead(d *schema.ResourceData, m interface{}) error { } } - return fmt.Errorf("kafka connector %s/%s not found", connectorName) + return fmt.Errorf("kafka connector %s not found", connectorName) } diff --git a/go.sum b/go.sum index 6510de4cb..737141012 100644 --- a/go.sum +++ b/go.sum @@ -56,7 +56,9 @@ github.com/chzyer/readline v0.0.0-20161106042343-c914be64f07d/go.mod h1:nSuG5e5P github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/bbolt v1.3.0/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= +github.com/coreos/etcd v3.3.10+incompatible h1:jFneRYjIvLMLhDLCzuTuU4rSJUjRplcJQ7pD7MnhC04= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=