diff --git a/cli/serve.go b/cli/serve.go index 51d334b7..ca260bf0 100644 --- a/cli/serve.go +++ b/cli/serve.go @@ -16,6 +16,7 @@ import ( "github.com/goto/entropy/modules" "github.com/goto/entropy/modules/firehose" "github.com/goto/entropy/modules/job" + "github.com/goto/entropy/modules/kafka" "github.com/goto/entropy/modules/kubernetes" "github.com/goto/entropy/pkg/logger" "github.com/goto/entropy/pkg/telemetry" @@ -90,6 +91,7 @@ func setupRegistry() module.Registry { kubernetes.Module, firehose.Module, job.Module, + kafka.Module, } registry := &modules.Registry{} diff --git a/internal/store/postgres/resource_model.go b/internal/store/postgres/resource_model.go index 75207ce3..18876394 100644 --- a/internal/store/postgres/resource_model.go +++ b/internal/store/postgres/resource_model.go @@ -14,8 +14,8 @@ import ( ) const listResourceByFilterQuery = `SELECT r.id, r.urn, r.kind, r.name, r.project, r.created_at, r.updated_at, r.state_status, r.state_output, r.state_module_data, r.state_next_sync, r.state_sync_result, r.created_by, r.updated_by, - array_agg(rt.tag)::text[] AS tags, - jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies + COALESCE(NULLIF(array_agg(rt.tag), '{NULL}'), '{}')::text[] AS tags, + jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies FROM resources r LEFT JOIN resource_dependencies rd ON r.id = rd.resource_id LEFT JOIN resources d ON rd.depends_on = d.id @@ -28,8 +28,8 @@ OFFSET $4 ` const listResourceWithSpecConfigsByFilterQuery = `SELECT r.id, r.urn, r.kind, r.name, r.project, r.created_at, r.updated_at, r.spec_configs, r.state_status, r.state_output, r.state_module_data, r.state_next_sync, r.state_sync_result, r.created_by, r.updated_by, - array_agg(rt.tag)::text[] AS tags, - jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies + COALESCE(NULLIF(array_agg(rt.tag), '{NULL}'), '{}')::text[] AS tags, + jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies FROM resources r LEFT JOIN resource_dependencies rd ON r.id = rd.resource_id LEFT JOIN resources d ON rd.depends_on = d.id diff --git a/modules/kafka/config.go b/modules/kafka/config.go new file mode 100644 index 00000000..5e57761c --- /dev/null +++ b/modules/kafka/config.go @@ -0,0 +1,74 @@ +package kafka + +import ( + _ "embed" + "encoding/json" + + "github.com/goto/entropy/core/resource" + "github.com/goto/entropy/pkg/errors" + "github.com/goto/entropy/pkg/validator" +) + +var ( + //go:embed schema/config.json + configSchemaRaw []byte + validateConfig = validator.FromJSONSchema(configSchemaRaw) +) + +type Config struct { + Entity string `json:"entity,omitempty"` + Environment string `json:"environment,omitempty"` + Landscape string `json:"landscape,omitempty"` + Organization string `json:"organization,omitempty"` + AdvertiseMode AdvertiseMode `json:"advertise_mode"` + Brokers []Broker `json:"brokers,omitempty"` + Type string `json:"type"` +} + +type AdvertiseMode struct { + Host string `json:"host"` + Address string `json:"address"` +} + +type Broker struct { + Name string `json:"name"` + Host string `json:"host"` + Address string `json:"address"` +} + +func readConfig(res resource.Resource, confJSON json.RawMessage, dc driverConf) (*Config, error) { + var resCfg, cfg Config + + if err := json.Unmarshal(confJSON, &cfg); err != nil { + return nil, errors.ErrInvalid.WithMsgf("failed to unmarshal").WithCausef(err.Error()) + } + + if res.Spec.Configs != nil { + if err := json.Unmarshal(res.Spec.Configs, &resCfg); err != nil { + return nil, errors.ErrInvalid.WithMsgf("failed to unmarshal").WithCausef(err.Error()) + } + } + + if cfg.Type == "" { + if resCfg.Type != "" { + cfg.Type = resCfg.Type + } else { + cfg.Type = dc.Type + } + } + + if cfg.Brokers == nil { + cfg.Brokers = resCfg.Brokers + } + + newConfJSON, err := json.Marshal(cfg) + if err != nil { + return nil, errors.ErrInvalid.WithMsgf("failed to marshal").WithCausef(err.Error()) + } + + if err := validateConfig(newConfJSON); err != nil { + return nil, err + } + + return &cfg, nil +} diff --git a/modules/kafka/driver.go b/modules/kafka/driver.go new file mode 100644 index 00000000..852fd362 --- /dev/null +++ b/modules/kafka/driver.go @@ -0,0 +1,94 @@ +package kafka + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/core/resource" + "github.com/goto/entropy/modules" +) + +var defaultDriverConf = driverConf{ + Type: "source", +} + +type kafkaDriver struct { + conf driverConf +} + +type Output struct { + URL string `json:"url"` +} + +type driverConf struct { + Type string `json:"type"` +} + +func (m *kafkaDriver) Plan(ctx context.Context, res module.ExpandedResource, + act module.ActionRequest, +) (*resource.Resource, error) { + cfg, err := readConfig(res.Resource, act.Params, m.conf) + if err != nil { + return nil, err + } + + res.Resource.Spec = resource.Spec{ + Configs: modules.MustJSON(cfg), + Dependencies: nil, + } + + res.Resource.State = resource.State{ + Status: resource.StatusCompleted, + Output: modules.MustJSON(Output{ + URL: mapUrl(cfg), + }), + } + + return &res.Resource, nil +} + +func (*kafkaDriver) Sync(_ context.Context, res module.ExpandedResource) (*resource.State, error) { + return &resource.State{ + Status: resource.StatusCompleted, + Output: res.Resource.State.Output, + ModuleData: nil, + }, nil +} + +func (m *kafkaDriver) Output(ctx context.Context, res module.ExpandedResource) (json.RawMessage, error) { + cfg, err := readConfig(res.Resource, res.Resource.Spec.Configs, m.conf) + if err != nil { + return nil, err + } + + return modules.MustJSON(Output{ + URL: mapUrl(cfg), + }), nil +} + +func mapUrl(cfg *Config) string { + var mode, port string + if cfg.AdvertiseMode.Address != "" { + mode = "address" + port = cfg.AdvertiseMode.Address + } else { + mode = "host" + port = cfg.AdvertiseMode.Host + } + + var urls []string + for _, broker := range cfg.Brokers { + var addr string + if mode == "address" { + addr = broker.Address + } else { + addr = broker.Host + } + urls = append(urls, fmt.Sprintf("%s:%s", addr, port)) + } + + return strings.Join(urls, ",") +} diff --git a/modules/kafka/module.go b/modules/kafka/module.go new file mode 100644 index 00000000..72440d39 --- /dev/null +++ b/modules/kafka/module.go @@ -0,0 +1,24 @@ +package kafka + +import ( + "encoding/json" + + "github.com/goto/entropy/core/module" +) + +var Module = module.Descriptor{ + Kind: "kafka", + Actions: []module.ActionDesc{ + { + Name: module.CreateAction, + }, + { + Name: module.UpdateAction, + }, + }, + DriverFactory: func(_ json.RawMessage) (module.Driver, error) { + return &kafkaDriver{ + conf: defaultDriverConf, + }, nil + }, +} diff --git a/modules/kafka/schema/config.json b/modules/kafka/schema/config.json new file mode 100644 index 00000000..3977ae43 --- /dev/null +++ b/modules/kafka/schema/config.json @@ -0,0 +1,53 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": ["type", "brokers"], + "properties": { + "type": { + "type": "string" + }, + "advertise_mode": { + "type": "object", + "additionalProperties": true, + "properties": { + "host": { + "type": "string" + }, + "address": { + "type": "string" + } + } + }, + "brokers": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { + "type": "string" + }, + "host": { + "type": "string" + }, + "address": { + "type": "string" + } + }, + "required": ["name", "host", "address"] + } + }, + "entity": { + "type": "string" + }, + "environment": { + "type": "string" + }, + "landscape": { + "type": "string" + }, + "organization": { + "type": "string" + } + } +} \ No newline at end of file diff --git a/test/e2e_test/firehose_test.go b/test/e2e_test/firehose_test.go index 3d95f37d..f63033bf 100644 --- a/test/e2e_test/firehose_test.go +++ b/test/e2e_test/firehose_test.go @@ -32,7 +32,7 @@ type FirehoseTestSuite struct { } func (s *FirehoseTestSuite) SetupTest() { - s.ctx, s.moduleClient, s.resourceClient, s.appConfig, s.pool, s.resource, s.kubeProvider, s.cancelModuleClient, s.cancelResourceClient, s.cancel = testbench.SetupTests(s.T(), true) + s.ctx, s.moduleClient, s.resourceClient, s.appConfig, s.pool, s.resource, s.kubeProvider, s.cancelModuleClient, s.cancelResourceClient, s.cancel = testbench.SetupTests(s.T(), true, true) modules, err := s.moduleClient.ListModules(s.ctx, &entropyv1beta1.ListModulesRequest{}) s.Require().NoError(err) diff --git a/test/e2e_test/kafka_test.go b/test/e2e_test/kafka_test.go new file mode 100644 index 00000000..ce28239a --- /dev/null +++ b/test/e2e_test/kafka_test.go @@ -0,0 +1,119 @@ +package e2e_test + +import ( + "context" + "encoding/json" + "os" + "testing" + + entropyv1beta1 "github.com/goto/entropy/proto/gotocompany/entropy/v1beta1" + "github.com/goto/entropy/test/testbench" + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/suite" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/structpb" +) + +type KafkaTestSuite struct { + suite.Suite + ctx context.Context + moduleClient entropyv1beta1.ModuleServiceClient + resourceClient entropyv1beta1.ResourceServiceClient + cancelModuleClient func() + cancelResourceClient func() + cancel func() + resource *dockertest.Resource + pool *dockertest.Pool +} + +func (s *KafkaTestSuite) SetupTest() { + s.ctx, s.moduleClient, s.resourceClient, _, s.pool, s.resource, _, s.cancelModuleClient, s.cancelResourceClient, s.cancel = testbench.SetupTests(s.T(), false, false) +} + +func (s *KafkaTestSuite) TestKafka() { + s.Run("create kafka module return success", func() { + moduleData, err := os.ReadFile(testbench.TestDataPath + "/module/kafka_module.json") + if err != nil { + s.T().Fatal(err) + } + + var moduleConfig *entropyv1beta1.Module + err = json.Unmarshal(moduleData, &moduleConfig) + if err != nil { + s.T().Fatal(err) + } + _, err = s.moduleClient.CreateModule(s.ctx, &entropyv1beta1.CreateModuleRequest{ + Module: moduleConfig, + }) + s.Require().NoError(err) + }) + + s.Run("create kafka with invalid config will return invalid error", func() { + _, err := s.resourceClient.CreateResource(s.ctx, &entropyv1beta1.CreateResourceRequest{ + Resource: &entropyv1beta1.Resource{ + Name: "test-kafka", + Project: "test-project", + Kind: "kafka", + Spec: &entropyv1beta1.ResourceSpec{ + Configs: structpb.NewStringValue("{}"), + Dependencies: []*entropyv1beta1.ResourceDependency{}, + }, + }, + }) + s.Assert().Equal(codes.InvalidArgument, status.Convert(err).Code()) + }) + + s.Run("create kafka with right config will return success", func() { + resourceData, err := os.ReadFile(testbench.TestDataPath + "/resource/kafka_resource.json") + if err != nil { + s.T().Fatal(err) + } + + var resourceConfig *entropyv1beta1.Resource + err = json.Unmarshal(resourceData, &resourceConfig) + if err != nil { + s.T().Fatal(err) + } + + _, err = s.resourceClient.CreateResource(s.ctx, &entropyv1beta1.CreateResourceRequest{ + Resource: resourceConfig, + }) + s.Require().NoError(err) + }) + + resources, err := s.resourceClient.ListResources(s.ctx, &entropyv1beta1.ListResourcesRequest{ + Kind: "kafka", + }) + s.Require().NoError(err) + s.Require().Equal(1, len(resources.GetResources())) + + s.Run("update kafka with right config will return success", func() { + resourceData, err := os.ReadFile(testbench.TestDataPath + "/resource/kafka_resource.json") + if err != nil { + s.T().Fatal(err) + } + + var resourceConfig *entropyv1beta1.Resource + err = json.Unmarshal(resourceData, &resourceConfig) + if err != nil { + s.T().Fatal(err) + } + + _, err = s.resourceClient.UpdateResource(s.ctx, &entropyv1beta1.UpdateResourceRequest{ + Urn: resources.GetResources()[0].Urn, + NewSpec: resourceConfig.Spec, + }) + s.Require().NoError(err) + }) +} + +func (s *KafkaTestSuite) TearDownTest() { + if err := s.pool.Purge(s.resource); err != nil { + s.T().Fatal(err) + } +} + +func TestKafkaTestSuite(t *testing.T) { + suite.Run(t, new(KafkaTestSuite)) +} diff --git a/test/e2e_test/worker_test.go b/test/e2e_test/worker_test.go index 50ac6f17..b22951ba 100644 --- a/test/e2e_test/worker_test.go +++ b/test/e2e_test/worker_test.go @@ -30,7 +30,7 @@ type WorkerTestSuite struct { } func (s *WorkerTestSuite) SetupTest() { - s.ctx, s.moduleClient, s.resourceClient, s.appConfig, s.pool, s.resource, s.kubeProvider, s.cancelModuleClient, s.cancelResourceClient, s.cancel = testbench.SetupTests(s.T(), false) + s.ctx, s.moduleClient, s.resourceClient, s.appConfig, s.pool, s.resource, s.kubeProvider, s.cancelModuleClient, s.cancelResourceClient, s.cancel = testbench.SetupTests(s.T(), false, true) modules, err := s.moduleClient.ListModules(s.ctx, &entropyv1beta1.ListModulesRequest{}) s.Require().NoError(err) diff --git a/test/testbench/test_data/module/kafka_module.json b/test/testbench/test_data/module/kafka_module.json new file mode 100644 index 00000000..ee00f8d2 --- /dev/null +++ b/test/testbench/test_data/module/kafka_module.json @@ -0,0 +1,5 @@ +{ + "name": "kafka", + "project": "test-project", + "configs": {} +} \ No newline at end of file diff --git a/test/testbench/test_data/module/kubernetes_module.json b/test/testbench/test_data/module/kubernetes_module.json index 93380269..01d4dd79 100644 --- a/test/testbench/test_data/module/kubernetes_module.json +++ b/test/testbench/test_data/module/kubernetes_module.json @@ -2,4 +2,4 @@ "name": "kubernetes", "project": "test-project", "configs": {} - } \ No newline at end of file +} \ No newline at end of file diff --git a/test/testbench/test_data/resource/kafka_resource.json b/test/testbench/test_data/resource/kafka_resource.json new file mode 100644 index 00000000..3a3672a5 --- /dev/null +++ b/test/testbench/test_data/resource/kafka_resource.json @@ -0,0 +1,25 @@ +{ + "kind": "kafka", + "name": "test-kafka", + "project": "test-project", + "labels": { + "description": "test kafka resource" + }, + "spec": { + "configs": { + "advertise_mode": { + "host": "6667", + "address": "6668" + }, + "brokers": [ + { + "name": "test-project-test-kafka-01", + "host": "test-project-test-kafka-01", + "address": "127.0.0.1" + } + ], + "type": "source" + }, + "dependencies": [] + } +} \ No newline at end of file diff --git a/test/testbench/testbench.go b/test/testbench/testbench.go index 236db41c..7792860e 100644 --- a/test/testbench/testbench.go +++ b/test/testbench/testbench.go @@ -33,7 +33,7 @@ var ( TestNamespace = "default" ) -func SetupTests(t *testing.T, spawnWorkers bool) (context.Context, entropyv1beta1.ModuleServiceClient, entropyv1beta1.ResourceServiceClient, *cli.Config, +func SetupTests(t *testing.T, spawnWorkers bool, setupKube bool) (context.Context, entropyv1beta1.ModuleServiceClient, entropyv1beta1.ResourceServiceClient, *cli.Config, *dockertest.Pool, *dockertest.Resource, *cluster.Provider, func(), func(), func()) { t.Helper() @@ -47,11 +47,14 @@ func SetupTests(t *testing.T, spawnWorkers bool) (context.Context, entropyv1beta t.Fatal(err) } - zapLogger.Info("creating cluster") - provider := cluster.NewProvider() - err = provider.Create(TestClusterName) - if err != nil { - t.Fatal(err) + var provider *cluster.Provider + if setupKube { + zapLogger.Info("creating cluster") + provider = cluster.NewProvider() + err = provider.Create(TestClusterName) + if err != nil { + t.Fatal(err) + } } zapLogger.Info("creating postgres") @@ -118,9 +121,11 @@ func SetupTests(t *testing.T, spawnWorkers bool) (context.Context, entropyv1beta t.Fatal() } - err = BootstrapKubernetesResource(ctx, resourceClient, provider, TestDataPath) - if err != nil { - t.Fatal() + if setupKube { + err = BootstrapKubernetesResource(ctx, resourceClient, provider, TestDataPath) + if err != nil { + t.Fatal() + } } return ctx, moduleClient, resourceClient, appConfig, pool, postgres.GetResource(), provider, cancelModuleClient, cancelResourceClient, cancel