From 8ca3702c7a138dd94b40a4de95670f2c419e37ac Mon Sep 17 00:00:00 2001 From: Ishan Arya Date: Mon, 9 Sep 2024 11:54:50 +0530 Subject: [PATCH 1/8] rebase --- cli/serve.go | 2 ++ modules/flink/config.go | 52 ++++++++++++++++++++++++++++++++ modules/flink/driver.go | 37 +++++++++++++++++++++++ modules/flink/driver_output.go | 35 +++++++++++++++++++++ modules/flink/driver_plan.go | 27 +++++++++++++++++ modules/flink/driver_sync.go | 16 ++++++++++ modules/flink/module.go | 33 ++++++++++++++++++++ modules/flink/schema/config.json | 27 +++++++++++++++++ 8 files changed, 229 insertions(+) create mode 100644 modules/flink/config.go create mode 100644 modules/flink/driver.go create mode 100644 modules/flink/driver_output.go create mode 100644 modules/flink/driver_plan.go create mode 100644 modules/flink/driver_sync.go create mode 100644 modules/flink/module.go create mode 100644 modules/flink/schema/config.json diff --git a/cli/serve.go b/cli/serve.go index ca260bf..1224201 100644 --- a/cli/serve.go +++ b/cli/serve.go @@ -15,6 +15,7 @@ import ( "github.com/goto/entropy/internal/store/postgres" "github.com/goto/entropy/modules" "github.com/goto/entropy/modules/firehose" + "github.com/goto/entropy/modules/flink" "github.com/goto/entropy/modules/job" "github.com/goto/entropy/modules/kafka" "github.com/goto/entropy/modules/kubernetes" @@ -92,6 +93,7 @@ func setupRegistry() module.Registry { firehose.Module, job.Module, kafka.Module, + flink.Module, } registry := &modules.Registry{} diff --git a/modules/flink/config.go b/modules/flink/config.go new file mode 100644 index 0000000..9c6e666 --- /dev/null +++ b/modules/flink/config.go @@ -0,0 +1,52 @@ +package flink + +import ( + _ "embed" + "encoding/json" + + "github.com/goto/entropy/core/resource" + "github.com/goto/entropy/pkg/errors" + "github.com/goto/entropy/pkg/validator" +) + +var ( + //go:embed schema/config.json + configSchemaRaw []byte + + validateConfig = validator.FromJSONSchema(configSchemaRaw) +) + +type Influx struct { + URL string `json:"url,omitempty"` + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` +} + +type Config struct { + KubeNamespace string `json:"kube_namespace,omitempty"` + Influx Influx `json:"influx,omitempty"` + SinkKafkaStreamName string `json:"sink_kafka_stream_name,omitempty"` +} + +func readConfig(_ resource.Resource, confJSON json.RawMessage, dc driverConf) (*Config, error) { + var cfg Config + if err := json.Unmarshal(confJSON, &cfg); err != nil { + return nil, errors.ErrInvalid.WithMsgf("invalid config json").WithCausef(err.Error()) + } + + if cfg.Influx.URL == "" { + cfg.Influx.URL = dc.Influx.URL + cfg.Influx.Username = dc.Influx.Username + cfg.Influx.Password = dc.Influx.Password + } + + if cfg.SinkKafkaStreamName == "" { + cfg.SinkKafkaStreamName = dc.SinkKafkaStreamName + } + + if cfg.KubeNamespace == "" { + cfg.KubeNamespace = dc.KubeNamespace + } + + return &cfg, nil +} diff --git a/modules/flink/driver.go b/modules/flink/driver.go new file mode 100644 index 0000000..99b741a --- /dev/null +++ b/modules/flink/driver.go @@ -0,0 +1,37 @@ +package flink + +import ( + "encoding/json" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/modules/kubernetes" + "github.com/goto/entropy/pkg/errors" +) + +type flinkDriver struct { + conf driverConf +} + +type driverConf struct { + Influx Influx `json:"influx,omitempty"` + SinkKafkaStreamName string `json:"sink_kafka_stream_name,omitempty"` + KubeNamespace string `json:"kube_namespace,omitempty"` +} + +type Output struct { + KubeCluster kubernetes.Output `json:"kube_cluster,omitempty"` + KubeNamespace string `json:"kube_namespace,omitempty"` + Influx Influx `json:"influx,omitempty"` + SinkKafkaStreamName string `json:"sink_kafka_stream_name,omitempty"` +} + +func readOutputData(exr module.ExpandedResource) (*Output, error) { + var curOut Output + if len(exr.Resource.State.Output) == 0 { + return &curOut, nil + } + if err := json.Unmarshal(exr.Resource.State.Output, &curOut); err != nil { + return nil, errors.ErrInternal.WithMsgf("corrupted output").WithCausef(err.Error()) + } + return &curOut, nil +} diff --git a/modules/flink/driver_output.go b/modules/flink/driver_output.go new file mode 100644 index 0000000..d1072c4 --- /dev/null +++ b/modules/flink/driver_output.go @@ -0,0 +1,35 @@ +package flink + +import ( + "context" + "encoding/json" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/modules" + "github.com/goto/entropy/modules/kubernetes" + "github.com/goto/entropy/pkg/errors" +) + +func (fd *flinkDriver) Output(ctx context.Context, exr module.ExpandedResource) (json.RawMessage, error) { + output, err := readOutputData(exr) + if err != nil { + return nil, err + } + + conf, err := readConfig(exr.Resource, exr.Spec.Configs, fd.conf) + if err != nil { + return nil, errors.ErrInternal.WithCausef(err.Error()) + } + + var kubeOut kubernetes.Output + if err := json.Unmarshal(exr.Dependencies[keyKubeDependency].Output, &kubeOut); err != nil { + return nil, errors.ErrInternal.WithMsgf("invalid kube state").WithCausef(err.Error()) + } + + output.KubeCluster = kubeOut + output.Influx = conf.Influx + output.KubeNamespace = conf.KubeNamespace + output.SinkKafkaStreamName = conf.SinkKafkaStreamName + + return modules.MustJSON(output), nil +} diff --git a/modules/flink/driver_plan.go b/modules/flink/driver_plan.go new file mode 100644 index 0000000..6bfe447 --- /dev/null +++ b/modules/flink/driver_plan.go @@ -0,0 +1,27 @@ +package flink + +import ( + "context" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/core/resource" +) + +func (fd *flinkDriver) Plan(ctx context.Context, res module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { + res.Resource.Spec = resource.Spec{ + Configs: act.Params, + Dependencies: res.Spec.Dependencies, + } + + output, err := fd.Output(ctx, res) + if err != nil { + return nil, err + } + + res.Resource.State = resource.State{ + Status: resource.StatusCompleted, + Output: output, + } + + return &res.Resource, nil +} diff --git a/modules/flink/driver_sync.go b/modules/flink/driver_sync.go new file mode 100644 index 0000000..f32c891 --- /dev/null +++ b/modules/flink/driver_sync.go @@ -0,0 +1,16 @@ +package flink + +import ( + "context" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/core/resource" +) + +func (*flinkDriver) Sync(_ context.Context, res module.ExpandedResource) (*resource.State, error) { + return &resource.State{ + Status: resource.StatusCompleted, + Output: res.Resource.State.Output, + ModuleData: nil, + }, nil +} diff --git a/modules/flink/module.go b/modules/flink/module.go new file mode 100644 index 0000000..4a319d2 --- /dev/null +++ b/modules/flink/module.go @@ -0,0 +1,33 @@ +package flink + +import ( + _ "embed" + "encoding/json" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/pkg/errors" +) + +const ( + keyKubeDependency = "kube_cluster" +) + +var Module = module.Descriptor{ + Kind: "flink", + Actions: []module.ActionDesc{ + { + Name: module.CreateAction, + }, + { + Name: module.UpdateAction, + }, + }, + DriverFactory: func(conf json.RawMessage) (module.Driver, error) { + fd := &flinkDriver{} + err := json.Unmarshal(conf, &fd) + if err != nil { + return nil, errors.ErrInvalid.WithMsgf("failed to unmarshal module config: %v", err) + } + return fd, nil + }, +} diff --git a/modules/flink/schema/config.json b/modules/flink/schema/config.json new file mode 100644 index 0000000..e9e5bd4 --- /dev/null +++ b/modules/flink/schema/config.json @@ -0,0 +1,27 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "kube_namespace": { + "type": "string" + }, + "influx": { + "type": "object", + "properties": { + "url": { + "type": "string" + }, + "username": { + "type": "string" + }, + "password": { + "type": "string" + } + } + }, + "sink_kafka_stream_name": { + "type": "string" + } + } +} \ No newline at end of file From 01b77f466507bb64f490c2787cd16d9447838ac5 Mon Sep 17 00:00:00 2001 From: Ishan Arya Date: Mon, 9 Sep 2024 15:49:08 +0530 Subject: [PATCH 2/8] test: add e2e-test --- modules/flink/driver_output.go | 5 +- test/e2e_test/firehose_test.go | 2 +- test/e2e_test/flink_test.go | 132 ++++++++++++++++++ test/testbench/bootstrap.go | 25 ++++ .../test_data/module/flink_module.json | 5 + .../test_data/resource/flink_resource.json | 25 ++++ test/testbench/testbench.go | 5 + 7 files changed, 197 insertions(+), 2 deletions(-) create mode 100644 test/e2e_test/flink_test.go create mode 100644 test/testbench/test_data/module/flink_module.json create mode 100644 test/testbench/test_data/resource/flink_resource.json diff --git a/modules/flink/driver_output.go b/modules/flink/driver_output.go index d1072c4..28f2efd 100644 --- a/modules/flink/driver_output.go +++ b/modules/flink/driver_output.go @@ -16,8 +16,11 @@ func (fd *flinkDriver) Output(ctx context.Context, exr module.ExpandedResource) return nil, err } - conf, err := readConfig(exr.Resource, exr.Spec.Configs, fd.conf) + conf, err := readConfig(exr.Resource, exr.Resource.Spec.Configs, fd.conf) if err != nil { + if errors.Is(err, errors.ErrInvalid) { + return nil, err + } return nil, errors.ErrInternal.WithCausef(err.Error()) } diff --git a/test/e2e_test/firehose_test.go b/test/e2e_test/firehose_test.go index f63033b..d411a61 100644 --- a/test/e2e_test/firehose_test.go +++ b/test/e2e_test/firehose_test.go @@ -36,7 +36,7 @@ func (s *FirehoseTestSuite) SetupTest() { modules, err := s.moduleClient.ListModules(s.ctx, &entropyv1beta1.ListModulesRequest{}) s.Require().NoError(err) - s.Require().Equal(6, len(modules.GetModules())) + s.Require().Equal(9, len(modules.GetModules())) resources, err := s.resourceClient.ListResources(s.ctx, &entropyv1beta1.ListResourcesRequest{ Kind: "kubernetes", diff --git a/test/e2e_test/flink_test.go b/test/e2e_test/flink_test.go new file mode 100644 index 0000000..ead77f6 --- /dev/null +++ b/test/e2e_test/flink_test.go @@ -0,0 +1,132 @@ +package e2e_test + +import ( + "context" + "encoding/json" + "os" + "testing" + + "github.com/goto/entropy/cli" + entropyv1beta1 "github.com/goto/entropy/proto/gotocompany/entropy/v1beta1" + "github.com/goto/entropy/test/testbench" + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/suite" + "sigs.k8s.io/kind/pkg/cluster" +) + +type FlinkTestSuite struct { + suite.Suite + ctx context.Context + moduleClient entropyv1beta1.ModuleServiceClient + resourceClient entropyv1beta1.ResourceServiceClient + cancelModuleClient func() + cancelResourceClient func() + cancel func() + resource *dockertest.Resource + pool *dockertest.Pool + appConfig *cli.Config + kubeProvider *cluster.Provider +} + +func (s *FlinkTestSuite) SetupTest() { + s.ctx, s.moduleClient, s.resourceClient, s.appConfig, s.pool, s.resource, s.kubeProvider, s.cancelModuleClient, s.cancelResourceClient, s.cancel = testbench.SetupTests(s.T(), true, true) + + modules, err := s.moduleClient.ListModules(s.ctx, &entropyv1beta1.ListModulesRequest{}) + s.Require().NoError(err) + s.Require().Equal(9, len(modules.GetModules())) + + resources, err := s.resourceClient.ListResources(s.ctx, &entropyv1beta1.ListResourcesRequest{ + Kind: "kubernetes", + }) + s.Require().NoError(err) + s.Require().Equal(3, len(resources.GetResources())) +} + +func (s *FlinkTestSuite) TestFlink() { + s.Run("create flink module return success", func() { + moduleData, err := os.ReadFile(testbench.TestDataPath + "module/flink_module.json") + if err != nil { + s.T().Fatal(err) + } + + var moduleConfig *entropyv1beta1.Module + err = json.Unmarshal(moduleData, &moduleConfig) + if err != nil { + s.T().Fatal(err) + } + _, err = s.moduleClient.CreateModule(s.ctx, &entropyv1beta1.CreateModuleRequest{ + Module: moduleConfig, + }) + s.Require().NoError(err) + }) + /* + s.Run("create flink with invalid config will return invalid error", func() { + _, err := s.resourceClient.CreateResource(s.ctx, &entropyv1beta1.CreateResourceRequest{ + Resource: &entropyv1beta1.Resource{ + Name: "test-flink", + Project: "test-project", + Kind: "flink", + Spec: &entropyv1beta1.ResourceSpec{ + Configs: structpb.NewStringValue("{}"), + Dependencies: []*entropyv1beta1.ResourceDependency{}, + }, + }, + }) + s.Assert().Equal(codes.InvalidArgument, status.Convert(err).Code()) + }) + */ + s.Run("create flink with right config will return success", func() { + resourceData, err := os.ReadFile(testbench.TestDataPath + "/resource/flink_resource.json") + if err != nil { + s.T().Fatal(err) + } + + var resourceConfig *entropyv1beta1.Resource + err = json.Unmarshal(resourceData, &resourceConfig) + if err != nil { + s.T().Fatal(err) + } + + _, err = s.resourceClient.CreateResource(s.ctx, &entropyv1beta1.CreateResourceRequest{ + Resource: resourceConfig, + }) + s.Require().NoError(err) + }) + + resources, err := s.resourceClient.ListResources(s.ctx, &entropyv1beta1.ListResourcesRequest{ + Kind: "flink", + }) + s.Require().NoError(err) + s.Require().Equal(1, len(resources.GetResources())) + + s.Run("update flink with right config will return success", func() { + resourceData, err := os.ReadFile(testbench.TestDataPath + "/resource/flink_resource.json") + if err != nil { + s.T().Fatal(err) + } + + var resourceConfig *entropyv1beta1.Resource + err = json.Unmarshal(resourceData, &resourceConfig) + if err != nil { + s.T().Fatal(err) + } + + resourceConfig.Spec.Dependencies = nil + + _, err = s.resourceClient.UpdateResource(s.ctx, &entropyv1beta1.UpdateResourceRequest{ + Urn: resources.GetResources()[0].Urn, + NewSpec: resourceConfig.Spec, + }) + s.Require().NoError(err) + }) +} + +func (s *FlinkTestSuite) TearDownTest() { + if err := s.pool.Purge(s.resource); err != nil { + s.T().Fatal(err) + } +} + +func TestFlinkTestSuite(t *testing.T) { + suite.Run(t, new(FlinkTestSuite)) +} diff --git a/test/testbench/bootstrap.go b/test/testbench/bootstrap.go index d67e44e..054fdc7 100644 --- a/test/testbench/bootstrap.go +++ b/test/testbench/bootstrap.go @@ -60,6 +60,31 @@ func BootstrapFirehoseModule(ctx context.Context, client entropyv1beta1.ModuleSe return nil } +func BootstrapFlinkModule(ctx context.Context, client entropyv1beta1.ModuleServiceClient, testDataPath string) error { + moduleData, err := os.ReadFile(testDataPath + "/module/flink_module.json") + if err != nil { + return err + } + + var moduleConfig *entropyv1beta1.Module + if err = json.Unmarshal(moduleData, &moduleConfig); err != nil { + return err + } + + project := moduleConfig.Project + for i := 0; i < 3; i++ { + moduleConfig.Project = fmt.Sprintf("%s-%d", project, i) + + if _, err := client.CreateModule(ctx, &entropyv1beta1.CreateModuleRequest{ + Module: moduleConfig, + }); err != nil { + return err + } + } + + return nil +} + func BootstrapKubernetesResource(ctx context.Context, client entropyv1beta1.ResourceServiceClient, kubeProvider *cluster.Provider, testDataPath string) error { resourceData, err := os.ReadFile(testDataPath + "/resource/kubernetes_resource.json") if err != nil { diff --git a/test/testbench/test_data/module/flink_module.json b/test/testbench/test_data/module/flink_module.json new file mode 100644 index 0000000..7e88730 --- /dev/null +++ b/test/testbench/test_data/module/flink_module.json @@ -0,0 +1,5 @@ +{ + "name": "flink", + "project": "test-project", + "configs": {} +} \ No newline at end of file diff --git a/test/testbench/test_data/resource/flink_resource.json b/test/testbench/test_data/resource/flink_resource.json new file mode 100644 index 0000000..ada2bf2 --- /dev/null +++ b/test/testbench/test_data/resource/flink_resource.json @@ -0,0 +1,25 @@ +{ + "kind": "flink", + "name": "test-flink", + "project": "test-project-0", + "labels": { + "description": "test flink resource" + }, + "spec": { + "configs": { + "influx": { + "password": "influx-password", + "url": "localhost:1234", + "username": "influx-user" + }, + "kube_namespace": "flink-ns", + "sink_kafka_stream_name": "flinkstream" + }, + "dependencies": [ + { + "key": "kube_cluster", + "value": "orn:entropy:kubernetes:test-project-0:test-kube" + } + ] + } +} \ No newline at end of file diff --git a/test/testbench/testbench.go b/test/testbench/testbench.go index 7792860..e71ea50 100644 --- a/test/testbench/testbench.go +++ b/test/testbench/testbench.go @@ -121,6 +121,11 @@ func SetupTests(t *testing.T, spawnWorkers bool, setupKube bool) (context.Contex t.Fatal() } + err = BootstrapFlinkModule(ctx, moduleClient, TestDataPath) + if err != nil { + t.Fatal() + } + if setupKube { err = BootstrapKubernetesResource(ctx, resourceClient, provider, TestDataPath) if err != nil { From d8a9064f9e59b7e324d684b2039f03f40dd0af4f Mon Sep 17 00:00:00 2001 From: Ishan Arya Date: Mon, 9 Sep 2024 16:00:33 +0530 Subject: [PATCH 3/8] test: fix TearDown --- test/e2e_test/flink_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/e2e_test/flink_test.go b/test/e2e_test/flink_test.go index ead77f6..462859e 100644 --- a/test/e2e_test/flink_test.go +++ b/test/e2e_test/flink_test.go @@ -125,6 +125,12 @@ func (s *FlinkTestSuite) TearDownTest() { if err := s.pool.Purge(s.resource); err != nil { s.T().Fatal(err) } + + if err := s.kubeProvider.Delete(testbench.TestClusterName, ""); err != nil { + s.T().Fatal(err) + } + + s.cancel() } func TestFlinkTestSuite(t *testing.T) { From 06c06dd9d0c27c4c981188cbfd034c9fa13d78e3 Mon Sep 17 00:00:00 2001 From: Ishan Arya Date: Mon, 9 Sep 2024 16:03:43 +0530 Subject: [PATCH 4/8] test: fix test --- test/e2e_test/worker_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e_test/worker_test.go b/test/e2e_test/worker_test.go index b22951b..96b37bc 100644 --- a/test/e2e_test/worker_test.go +++ b/test/e2e_test/worker_test.go @@ -34,7 +34,7 @@ func (s *WorkerTestSuite) SetupTest() { modules, err := s.moduleClient.ListModules(s.ctx, &entropyv1beta1.ListModulesRequest{}) s.Require().NoError(err) - s.Require().Equal(6, len(modules.GetModules())) + s.Require().Equal(9, len(modules.GetModules())) resources, err := s.resourceClient.ListResources(s.ctx, &entropyv1beta1.ListResourcesRequest{ Kind: "kubernetes", From 1a06848fe1de08c4630496a6d66030615c156e9e Mon Sep 17 00:00:00 2001 From: Ishan Arya Date: Mon, 9 Sep 2024 16:16:26 +0530 Subject: [PATCH 5/8] feat: change var name --- modules/flink/config.go | 10 +++++----- modules/flink/driver.go | 14 +++++++------- modules/flink/driver_output.go | 2 +- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/modules/flink/config.go b/modules/flink/config.go index 9c6e666..24a06a2 100644 --- a/modules/flink/config.go +++ b/modules/flink/config.go @@ -23,9 +23,9 @@ type Influx struct { } type Config struct { - KubeNamespace string `json:"kube_namespace,omitempty"` - Influx Influx `json:"influx,omitempty"` - SinkKafkaStreamName string `json:"sink_kafka_stream_name,omitempty"` + KubeNamespace string `json:"kube_namespace,omitempty"` + Influx Influx `json:"influx,omitempty"` + SinkKafkaStream string `json:"sink_kafka_stream,omitempty"` } func readConfig(_ resource.Resource, confJSON json.RawMessage, dc driverConf) (*Config, error) { @@ -40,8 +40,8 @@ func readConfig(_ resource.Resource, confJSON json.RawMessage, dc driverConf) (* cfg.Influx.Password = dc.Influx.Password } - if cfg.SinkKafkaStreamName == "" { - cfg.SinkKafkaStreamName = dc.SinkKafkaStreamName + if cfg.SinkKafkaStream == "" { + cfg.SinkKafkaStream = dc.SinkKafkaStream } if cfg.KubeNamespace == "" { diff --git a/modules/flink/driver.go b/modules/flink/driver.go index 99b741a..64a5365 100644 --- a/modules/flink/driver.go +++ b/modules/flink/driver.go @@ -13,16 +13,16 @@ type flinkDriver struct { } type driverConf struct { - Influx Influx `json:"influx,omitempty"` - SinkKafkaStreamName string `json:"sink_kafka_stream_name,omitempty"` - KubeNamespace string `json:"kube_namespace,omitempty"` + Influx Influx `json:"influx,omitempty"` + SinkKafkaStream string `json:"sink_kafka_stream,omitempty"` + KubeNamespace string `json:"kube_namespace,omitempty"` } type Output struct { - KubeCluster kubernetes.Output `json:"kube_cluster,omitempty"` - KubeNamespace string `json:"kube_namespace,omitempty"` - Influx Influx `json:"influx,omitempty"` - SinkKafkaStreamName string `json:"sink_kafka_stream_name,omitempty"` + KubeCluster kubernetes.Output `json:"kube_cluster,omitempty"` + KubeNamespace string `json:"kube_namespace,omitempty"` + Influx Influx `json:"influx,omitempty"` + SinkKafkaStream string `json:"sink_kafka_stream,omitempty"` } func readOutputData(exr module.ExpandedResource) (*Output, error) { diff --git a/modules/flink/driver_output.go b/modules/flink/driver_output.go index 28f2efd..2182308 100644 --- a/modules/flink/driver_output.go +++ b/modules/flink/driver_output.go @@ -32,7 +32,7 @@ func (fd *flinkDriver) Output(ctx context.Context, exr module.ExpandedResource) output.KubeCluster = kubeOut output.Influx = conf.Influx output.KubeNamespace = conf.KubeNamespace - output.SinkKafkaStreamName = conf.SinkKafkaStreamName + output.SinkKafkaStream = conf.SinkKafkaStream return modules.MustJSON(output), nil } From 0a925bf8da1a9c98e30e4e6edd3b5bb4e89910df Mon Sep 17 00:00:00 2001 From: Ishan Arya Date: Mon, 30 Sep 2024 14:32:01 +0530 Subject: [PATCH 6/8] feat: add dagger module create & update action (#107) * feat: add dagger module * feat: add flink dep * feat: add transformations * fix: read stream from config root * feat: add Plan implementation * fix: chart values * fix: resolve TODOs and refactored * fix: source sink base handling * feat: Output to have CR details * feat: handle status * refactor: seperate contants by type * refactor: kubeGetCRD function * feat: add dagger update action * fix: add Update action * chore: change var name to sink_kafka_stream * feat: merge consumer group ID if sink is same --------- Co-authored-by: Ishan Arya --- cli/serve.go | 2 + modules/dagger/config.go | 349 ++++++++++++++++++ modules/dagger/driver.go | 243 ++++++++++++ modules/dagger/driver_output.go | 60 +++ modules/dagger/driver_plan.go | 144 ++++++++ modules/dagger/driver_sync.go | 88 +++++ modules/dagger/module.go | 127 +++++++ modules/dagger/schema/config.json | 49 +++ modules/flink/config.go | 10 + modules/flink/driver.go | 4 + modules/flink/driver_output.go | 2 + modules/flink/schema/config.json | 2 +- pkg/helm/helm.go | 8 +- pkg/kube/client.go | 33 ++ .../test_data/resource/flink_resource.json | 2 +- 15 files changed, 1118 insertions(+), 5 deletions(-) create mode 100644 modules/dagger/config.go create mode 100644 modules/dagger/driver.go create mode 100644 modules/dagger/driver_output.go create mode 100644 modules/dagger/driver_plan.go create mode 100644 modules/dagger/driver_sync.go create mode 100644 modules/dagger/module.go create mode 100644 modules/dagger/schema/config.json diff --git a/cli/serve.go b/cli/serve.go index 1224201..095f33b 100644 --- a/cli/serve.go +++ b/cli/serve.go @@ -14,6 +14,7 @@ import ( entropyserver "github.com/goto/entropy/internal/server" "github.com/goto/entropy/internal/store/postgres" "github.com/goto/entropy/modules" + "github.com/goto/entropy/modules/dagger" "github.com/goto/entropy/modules/firehose" "github.com/goto/entropy/modules/flink" "github.com/goto/entropy/modules/job" @@ -94,6 +95,7 @@ func setupRegistry() module.Registry { job.Module, kafka.Module, flink.Module, + dagger.Module, } registry := &modules.Registry{} diff --git a/modules/dagger/config.go b/modules/dagger/config.go new file mode 100644 index 0000000..d389cda --- /dev/null +++ b/modules/dagger/config.go @@ -0,0 +1,349 @@ +package dagger + +import ( + _ "embed" + "encoding/json" + "fmt" + "strconv" + "strings" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/modules" + "github.com/goto/entropy/modules/flink" + "github.com/goto/entropy/pkg/errors" + "github.com/goto/entropy/pkg/validator" +) + +const ( + helmReleaseNameMaxLength = 53 +) + +// Stream-related constants +const ( + keyStreams = "STREAMS" + keySinkType = "SINK_TYPE" +) + +// Flink-related constants +const ( + keyFlinkJobID = "FLINK_JOB_ID" +) + +// Influx-related constants +const ( + keySinkInfluxURL = "SINK_INFLUX_URL" + keySinkInfluxPassword = "SINK_INFLUX_PASSWORD" + keySinkInfluxDBName = "SINK_INFLUX_DB_NAME" + keySinkInfluxUsername = "SINK_INFLUX_USERNAME" + keySinkInfluxMeasurementName = "SINK_INFLUX_MEASUREMENT_NAME" +) + +// Kafka-related constants +const ( + SourceKafkaConsumerConfigAutoCommitEnable = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE" + SourceKafkaConsumerConfigAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET" + SourceKafkaConsumerConfigBootstrapServers = "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS" + keySinkKafkaBrokers = "SINK_KAFKA_BROKERS" + keySinkKafkaStream = "SINK_KAFKA_STREAM" + keySinkKafkaProtoMsg = "SINK_KAFKA_PROTO_MESSAGE" + keySinkKafkaTopic = "SINK_KAFKA_TOPIC" + keySinkKafkaKey = "SINK_KAFKA_PROTO_KEY" + keySinkKafkaLingerMs = "SINK_KAFKA_LINGER_MS" +) + +// Sink types +const ( + SinkTypeInflux = "INFLUX" + SinkTypeKafka = "KAFKA" + SinkTypeBigquery = "BIGQUERY" +) + +// BigQuery-related constants +const ( + keySinkBigqueryGoogleCloudProjectID = "SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID" + keySinkBigqueryDatasetName = "SINK_BIGQUERY_DATASET_NAME" + keySinkBigqueryTableName = "SINK_BIGQUERY_TABLE_NAME" + keySinkBigqueryDatasetLabels = "SINK_BIGQUERY_DATASET_LABELS" + keySinkBigqueryTableLabels = "SINK_BIGQUERY_TABLE_LABELS" + keySinkBigqueryTablePartitioningEnable = "SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE" + keySinkBigqueryTableClusteringEnable = "SINK_BIGQUERY_TABLE_CLUSTERING_ENABLE" + keySinkBigqueryBatchSize = "SINK_BIGQUERY_BATCH_SIZE" + keySinkBigqueryTablePartitionKey = "SINK_BIGQUERY_TABLE_PARTITION_KEY" + keySinkBigqueryRowInsertIDEnable = "SINK_BIGQUERY_ROW_INSERT_ID_ENABLE" + keySinkBigqueryClientReadTimeoutMs = "SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS" + keySinkBigqueryClientConnectTimeoutMs = "SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS" + keySinkBigqueryTablePartitionExpiryMs = "SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS" + keySinkBigqueryDatasetLocation = "SINK_BIGQUERY_DATASET_LOCATION" + keySinkErrorTypesForFailure = "SINK_ERROR_TYPES_FOR_FAILURE" + keySinkBigqueryTableClusteringKeys = "SINK_BIGQUERY_TABLE_CLUSTERING_KEYS" +) + +var ( + //go:embed schema/config.json + configSchemaRaw []byte + + validateConfig = validator.FromJSONSchema(configSchemaRaw) +) + +type UsageSpec struct { + CPU string `json:"cpu,omitempty" validate:"required"` + Memory string `json:"memory,omitempty" validate:"required"` +} + +type Resources struct { + TaskManager UsageSpec `json:"taskmanager,omitempty"` + JobManager UsageSpec `json:"jobmanager,omitempty"` +} + +type Config struct { + Resources Resources `json:"resources,omitempty"` + Source []Source `json:"source,omitempty"` + Sink Sink `json:"sink,omitempty"` + EnvVariables map[string]string `json:"env_variables,omitempty"` + Replicas int `json:"replicas"` + SinkType string `json:"sink_type"` + Team string `json:"team"` + FlinkName string `json:"flink_name,omitempty"` + DeploymentID string `json:"deployment_id,omitempty"` + Savepoint any `json:"savepoint,omitempty"` + ChartValues *ChartValues `json:"chart_values,omitempty"` + Deleted bool `json:"deleted,omitempty"` + Namespace string `json:"namespace,omitempty"` + PrometheusURL string `json:"prometheus_url,omitempty"` + JarURI string `json:"jar_uri,omitempty"` +} + +type ChartValues struct { + ImageRepository string `json:"image_repository" validate:"required"` + ImageTag string `json:"image_tag" validate:"required"` + ChartVersion string `json:"chart_version" validate:"required"` + ImagePullPolicy string `json:"image_pull_policy"` +} + +type SourceDetail struct { + SourceName string `json:"SOURCE_NAME"` + SourceType string `json:"SOURCE_TYPE"` +} + +type SourceKafka struct { + SourceKafkaConsumerConfigAutoCommitEnable string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE"` + SourceKafkaConsumerConfigAutoOffsetReset string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"` + SourceKafkaTopicNames string `json:"SOURCE_KAFKA_TOPIC_NAMES"` + SourceKafkaName string `json:"SOURCE_KAFKA_NAME"` + SourceKafkaConsumerConfigGroupID string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID"` + SourceKafkaConsumerConfigBootstrapServers string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS"` +} + +type SourceParquet struct { + SourceParquetFileDateRange interface{} `json:"SOURCE_PARQUET_FILE_DATE_RANGE"` + SourceParquetFilePaths []string `json:"SOURCE_PARQUET_FILE_PATHS"` +} + +type Source struct { + InputSchemaProtoClass string `json:"INPUT_SCHEMA_PROTO_CLASS"` + InputSchemaEventTimestampFieldIndex string `json:"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX"` + SourceDetails []SourceDetail `json:"SOURCE_DETAILS"` + InputSchemaTable string `json:"INPUT_SCHEMA_TABLE"` + SourceKafka + SourceParquet +} + +type SinkKafka struct { + SinkKafkaBrokers string `json:"SINK_KAFKA_BROKERS"` + SinkKafkaStream string `json:"SINK_KAFKA_STREAM"` + SinkKafkaTopic string `json:"SINK_KAFKA_TOPIC"` + SinkKafkaProtoMsg string `json:"SINK_KAFKA_PROTO_MESSAGE"` + SinkKafkaLingerMs string `json:"SINK_KAFKA_LINGER_MS"` + SinkKafkaProtoKey string `json:"SINK_KAFKA_PROTO_KEY"` +} + +type SinkInflux struct { + SinkInfluxDBName string `json:"SINK_INFLUX_DB_NAME"` + SinkInfluxMeasurementName string `json:"SINK_INFLUX_MEASUREMENT_NAME"` +} + +type SinkBigquery struct { + SinkBigqueryGoogleCloudProjectID string `json:"SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID"` + SinkBigqueryTableName string `json:"SINK_BIGQUERY_TABLE_NAME"` + SinkBigqueryDatasetLabels string `json:"SINK_BIGQUERY_DATASET_LABELS"` + SinkBigqueryTableLabels string `json:"SINK_BIGQUERY_TABLE_LABELS"` + SinkBigqueryDatasetName string `json:"SINK_BIGQUERY_DATASET_NAME"` + SinkBigqueryTablePartitioningEnable string `json:"SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE"` + SinkBigqueryTablePartitionKey string `json:"SINK_BIGQUERY_TABLE_PARTITION_KEY"` + SinkBigqueryRowInsertIDEnable string `json:"SINK_BIGQUERY_ROW_INSERT_ID_ENABLE"` + SinkBigqueryClientReadTimeoutMs string `json:"SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS"` + SinkBigqueryClientConnectTimeoutMs string `json:"SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS"` + SinkBigqueryTablePartitionExpiryMs string `json:"SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS"` + SinkBigqueryDatasetLocation string `json:"SINK_BIGQUERY_DATASET_LOCATION"` + SinkBigqueryBatchSize string `json:"SINK_BIGQUERY_BATCH_SIZE"` + SinkBigqueryTableClusteringEnable string `json:"SINK_BIGQUERY_TABLE_CLUSTERING_ENABLE"` + SinkBigqueryTableClusteringKeys string `json:"SINK_BIGQUERY_TABLE_CLUSTERING_KEYS"` + SinkErrorTypesForFailure string `json:"SINK_ERROR_TYPES_FOR_FAILURE"` +} + +type Sink struct { + SinkKafka + SinkInflux + SinkBigquery +} + +func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverConf) (*Config, error) { + var cfg Config + err := json.Unmarshal(confJSON, &cfg) + if err != nil { + return nil, errors.ErrInvalid.WithMsgf("invalid config json").WithCausef(err.Error()) + } + + //transformation #9 and #11 + //transformation #1 + source := cfg.Source + + for i := range source { + if source[i].SourceParquet.SourceParquetFilePaths != nil && len(source[i].SourceParquet.SourceParquetFilePaths) > 0 { + //source is parquete + //do nothing + continue + } + //TODO: check how to handle increment group id on update + if source[i].SourceKafkaConsumerConfigGroupID == "" { + source[i].SourceKafkaConsumerConfigGroupID = incrementGroupId(r.Name+"-0001", i) + } + source[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable] + source[i].SourceKafkaConsumerConfigAutoOffsetReset = dc.EnvVariables[SourceKafkaConsumerConfigAutoOffsetReset] + source[i].SourceKafkaConsumerConfigBootstrapServers = dc.EnvVariables[SourceKafkaConsumerConfigBootstrapServers] + } + + cfg.Source = source + + //transformation #2 + cfg.EnvVariables = modules.CloneAndMergeMaps(dc.EnvVariables, cfg.EnvVariables) + + //transformation #3 + var flinkOut flink.Output + if err := json.Unmarshal(r.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil { + return nil, errors.ErrInternal.WithMsgf("invalid flink state").WithCausef(err.Error()) + } + + cfg.Namespace = flinkOut.KubeNamespace + + //transformation #4 + //transform resource name to safe length + + //transformation #5 + //TODO: build name from title as project--dagger + cfg.EnvVariables[keyFlinkJobID] = r.Name + + //transformation #6 + // note: enforce the kubernetes deployment name length limit. + if len(cfg.DeploymentID) == 0 { + cfg.DeploymentID = modules.SafeName(fmt.Sprintf("%s-%s", r.Project, r.Name), "-dagger", helmReleaseNameMaxLength) + } else if len(cfg.DeploymentID) > helmReleaseNameMaxLength { + return nil, errors.ErrInvalid.WithMsgf("deployment_id must not have more than 53 chars") + } + + //transformation #7 + cfg.EnvVariables[keySinkInfluxURL] = flinkOut.Influx.URL + cfg.EnvVariables[keySinkInfluxPassword] = flinkOut.Influx.Password + cfg.EnvVariables[keySinkInfluxUsername] = flinkOut.Influx.Username + //SINK_INFLUX_DB_NAME is added by client + //SINK_INFLUX_MEASUREMENT_NAME is added by client + //REDIS_SERVER is skipped + + //transformation #8 + //Longbow configs would be in base configs + + //transformation #10 + //this shall check if the project of the conf.EnvVars.STREAMS is same as that of the corresponding flink + //do we need to check this? + + //transformation #13 + cfg.EnvVariables[keySinkType] = cfg.SinkType + if cfg.SinkType == SinkTypeKafka { + cfg.EnvVariables[keySinkKafkaStream] = cfg.Sink.SinkKafka.SinkKafkaStream + cfg.EnvVariables[keySinkKafkaBrokers] = cfg.Sink.SinkKafka.SinkKafkaBrokers + cfg.EnvVariables[keySinkKafkaProtoMsg] = cfg.Sink.SinkKafka.SinkKafkaProtoMsg + cfg.EnvVariables[keySinkKafkaTopic] = cfg.Sink.SinkKafka.SinkKafkaTopic + cfg.EnvVariables[keySinkKafkaKey] = cfg.Sink.SinkKafka.SinkKafkaProtoKey + cfg.EnvVariables[keySinkKafkaLingerMs] = cfg.Sink.SinkKafka.SinkKafkaLingerMs + } else if cfg.SinkType == SinkTypeInflux { + cfg.EnvVariables[keySinkInfluxDBName] = cfg.Sink.SinkInflux.SinkInfluxDBName + cfg.EnvVariables[keySinkInfluxMeasurementName] = cfg.Sink.SinkInflux.SinkInfluxMeasurementName + } else if cfg.SinkType == SinkTypeBigquery { + cfg.EnvVariables[keySinkBigqueryGoogleCloudProjectID] = cfg.Sink.SinkBigquery.SinkBigqueryGoogleCloudProjectID + cfg.EnvVariables[keySinkBigqueryDatasetName] = cfg.Sink.SinkBigquery.SinkBigqueryDatasetName + cfg.EnvVariables[keySinkBigqueryTableName] = cfg.Sink.SinkBigquery.SinkBigqueryTableName + cfg.EnvVariables[keySinkBigqueryDatasetLabels] = cfg.Sink.SinkBigquery.SinkBigqueryDatasetLabels + cfg.EnvVariables[keySinkBigqueryTableLabels] = cfg.Sink.SinkBigquery.SinkBigqueryTableLabels + cfg.EnvVariables[keySinkBigqueryTablePartitioningEnable] = cfg.Sink.SinkBigquery.SinkBigqueryTablePartitioningEnable + cfg.EnvVariables[keySinkBigqueryTablePartitionKey] = cfg.Sink.SinkBigquery.SinkBigqueryTablePartitionKey + cfg.EnvVariables[keySinkBigqueryRowInsertIDEnable] = cfg.Sink.SinkBigquery.SinkBigqueryRowInsertIDEnable + cfg.EnvVariables[keySinkBigqueryClientReadTimeoutMs] = cfg.Sink.SinkBigquery.SinkBigqueryClientReadTimeoutMs + cfg.EnvVariables[keySinkBigqueryClientConnectTimeoutMs] = cfg.Sink.SinkBigquery.SinkBigqueryClientConnectTimeoutMs + cfg.EnvVariables[keySinkBigqueryTablePartitionExpiryMs] = cfg.Sink.SinkBigquery.SinkBigqueryTablePartitionExpiryMs + cfg.EnvVariables[keySinkBigqueryDatasetLocation] = cfg.Sink.SinkBigquery.SinkBigqueryDatasetLocation + cfg.EnvVariables[keySinkBigqueryBatchSize] = cfg.Sink.SinkBigquery.SinkBigqueryBatchSize + cfg.EnvVariables[keySinkBigqueryTableClusteringEnable] = cfg.Sink.SinkBigquery.SinkBigqueryTableClusteringEnable + cfg.EnvVariables[keySinkBigqueryTableClusteringKeys] = cfg.Sink.SinkBigquery.SinkBigqueryTableClusteringKeys + cfg.EnvVariables[keySinkErrorTypesForFailure] = cfg.Sink.SinkBigquery.SinkErrorTypesForFailure + } + + //transformation #14 + cfg.Resources = mergeResources(dc.Resources, cfg.Resources) + + cfg.PrometheusURL = flinkOut.PrometheusURL + cfg.FlinkName = flinkOut.FlinkName + + if cfg.Replicas <= 0 { + cfg.Replicas = 1 + } + + if err := validateConfig(confJSON); err != nil { + return nil, err + } + + return &cfg, nil +} + +func incrementGroupId(groupId string, step int) string { + incrementNumberInString := func(number string) int { + num, _ := strconv.Atoi(number) + return num + step + } + + leftZeroPad := func(number int) string { + return fmt.Sprintf("%04d", number) + } + + getLastAndRestFromArray := func(arr []string) ([]string, string) { + return arr[:len(arr)-1], arr[len(arr)-1] + } + + parts := strings.Split(groupId, "-") + name, number := getLastAndRestFromArray(parts) + updatedNumber := leftZeroPad(incrementNumberInString(number)) + return strings.Join(append(name, updatedNumber), "-") +} + +func mustMarshalJSON(v interface{}) []byte { + data, err := json.Marshal(v) + if err != nil { + panic(fmt.Sprintf("failed to marshal JSON: %v", err)) + } + return data +} + +func mergeResources(oldResources, newResources Resources) Resources { + if newResources.TaskManager.CPU == "" { + newResources.TaskManager.CPU = oldResources.TaskManager.CPU + } + if newResources.TaskManager.Memory == "" { + newResources.TaskManager.Memory = oldResources.TaskManager.Memory + } + if newResources.JobManager.CPU == "" { + newResources.JobManager.CPU = oldResources.JobManager.CPU + } + if newResources.JobManager.Memory == "" { + newResources.JobManager.Memory = oldResources.JobManager.Memory + } + return newResources +} diff --git a/modules/dagger/driver.go b/modules/dagger/driver.go new file mode 100644 index 0000000..1621dba --- /dev/null +++ b/modules/dagger/driver.go @@ -0,0 +1,243 @@ +package dagger + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "html/template" + "strings" + "time" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/core/resource" + "github.com/goto/entropy/modules" + "github.com/goto/entropy/modules/kubernetes" + "github.com/goto/entropy/pkg/errors" + "github.com/goto/entropy/pkg/helm" + "github.com/goto/entropy/pkg/kube" +) + +const ( + stepReleaseCreate = "release_create" + stepReleaseUpdate = "release_update" +) + +const ( + chartRepo = "https://goto.github.io/charts/" + chartName = "dagger-deployment-chart" + imageRepo = "gotocompany/dagger" +) + +const ( + labelsConfKey = "extra_labels" + + labelDeployment = "deployment" + labelOrchestrator = "orchestrator" + labelURN = "urn" + labelName = "name" + labelNamespace = "namespace" + + orchestratorLabelValue = "entropy" +) + +const defaultKey = "default" + +var defaultDriverConf = driverConf{ + Namespace: map[string]string{ + defaultKey: "dagger", + }, + ChartValues: ChartValues{ + ChartVersion: "0.1.0", + }, +} + +type daggerDriver struct { + timeNow func() time.Time + conf driverConf + kubeDeploy kubeDeployFn + kubeGetPod kubeGetPodFn + kubeGetCRD kubeGetCRDFn +} + +type ( + kubeDeployFn func(ctx context.Context, isCreate bool, conf kube.Config, hc helm.ReleaseConfig) error + kubeGetPodFn func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) + kubeGetCRDFn func(ctx context.Context, conf kube.Config, ns string, name string) (kube.FlinkDeploymentStatus, error) +) + +type driverConf struct { + // Labels to be injected to the chart during deployment. Values can be Go templates. + Labels map[string]string `json:"labels,omitempty"` + + // Namespace is the kubernetes namespace where firehoses will be deployed. + Namespace map[string]string `json:"namespace" validate:"required"` + + // ChartValues is the chart and image version information. + ChartValues ChartValues `json:"chart_values" validate:"required"` + + EnvVariables map[string]string `json:"env_variables,omitempty"` + + Resources Resources `json:"resources" validate:"required"` + + JarURI string `json:"jar_uri" validate:"required"` + + // timeout value for a kube deployment run + KubeDeployTimeout int `json:"kube_deploy_timeout_seconds"` +} + +type Output struct { + State string `json:"state,omitempty"` + JMDeployStatus string `json:"jm_deploy_status,omitempty"` + JobStatus string `json:"job_status,omitempty"` + Reconcilation string `json:"reconcilation,omitempty"` + Pods []kube.Pod `json:"pods,omitempty"` + Namespace string `json:"namespace,omitempty"` + JobID string `json:"job_id,omitempty"` +} + +type transientData struct { + PendingSteps []string `json:"pending_steps"` +} + +func mergeChartValues(cur, newVal *ChartValues) (*ChartValues, error) { + if newVal == nil { + return cur, nil + } + + merged := ChartValues{ + ChartVersion: newVal.ChartVersion, + } + + return &merged, nil +} + +func readOutputData(exr module.ExpandedResource) (*Output, error) { + var curOut Output + if len(exr.Resource.State.Output) == 0 { + return &curOut, nil + } + if err := json.Unmarshal(exr.Resource.State.Output, &curOut); err != nil { + return nil, errors.ErrInternal.WithMsgf("corrupted output").WithCausef(err.Error()) + } + return &curOut, nil +} + +func readTransientData(exr module.ExpandedResource) (*transientData, error) { + if len(exr.Resource.State.ModuleData) == 0 { + return &transientData{}, nil + } + + var modData transientData + if err := json.Unmarshal(exr.Resource.State.ModuleData, &modData); err != nil { + return nil, errors.ErrInternal.WithMsgf("corrupted transient data").WithCausef(err.Error()) + } + return &modData, nil +} + +func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config, + kubeOut kubernetes.Output, +) (*helm.ReleaseConfig, error) { + + entropyLabels := map[string]string{ + labelDeployment: conf.DeploymentID, + labelOrchestrator: orchestratorLabelValue, + } + + otherLabels := map[string]string{ + labelURN: res.URN, + labelName: res.Name, + labelNamespace: conf.Namespace, + } + + deploymentLabels, err := renderTpl(dd.conf.Labels, modules.CloneAndMergeMaps(res.Labels, modules.CloneAndMergeMaps(entropyLabels, otherLabels))) + if err != nil { + return nil, err + } + + rc := helm.DefaultReleaseConfig() + rc.Timeout = dd.conf.KubeDeployTimeout + rc.Name = conf.DeploymentID + rc.Repository = chartRepo + rc.Chart = chartName + rc.Namespace = conf.Namespace + rc.ForceUpdate = true + rc.Version = conf.ChartValues.ChartVersion + + imageRepository := dd.conf.ChartValues.ImageRepository + if conf.ChartValues.ImageRepository != "" { + imageRepository = conf.ChartValues.ImageRepository + } + + var programArgs []string + for key, value := range conf.EnvVariables { + // Check if the value is a JSON object and escape quotes if necessary + if json.Valid([]byte(value)) { + value = strings.ReplaceAll(value, `"`, `\"`) + } + programArgs = append(programArgs, fmt.Sprintf("\"%s\"", "--"+key), fmt.Sprintf("\"%v\"", value)) + } + + //fmt.Printf("programArgs: %v\n", programArgs) + formatted := fmt.Sprintf("[%s]", strings.Join(programArgs, ",")) + //fmt.Printf("formatted: %v\n", formatted) + encodedProgramArgs := base64.StdEncoding.EncodeToString([]byte(formatted)) + + rc.Values = map[string]any{ + labelsConfKey: modules.CloneAndMergeMaps(deploymentLabels, entropyLabels), + "image": imageRepository, + "deployment_id": conf.DeploymentID, + "configuration": map[string]any{ + "FLINK_PARALLELISM": conf.Replicas, + }, + "projectID": res.Project, + "name": res.Name, + "team": conf.Team, + "flink_name": conf.FlinkName, + "prometheus_url": conf.PrometheusURL, + "resources": map[string]any{ + "jobmanager": map[string]any{ + "cpu": conf.Resources.JobManager.CPU, + "memory": conf.Resources.JobManager.Memory, + }, + "taskmanager": map[string]any{ + "cpu": conf.Resources.TaskManager.CPU, + "memory": conf.Resources.JobManager.Memory, + }, + }, + "jarURI": conf.JarURI, + "programArgs": append([]string{"--encodedArgs"}, encodedProgramArgs), + "state": "running", + "namespace": conf.Namespace, + } + + return rc, nil +} + +// TODO: move this to pkg +func renderTpl(labelsTpl map[string]string, labelsValues map[string]string) (map[string]string, error) { + const useZeroValueForMissingKey = "missingkey=zero" + + finalLabels := map[string]string{} + for k, v := range labelsTpl { + var buf bytes.Buffer + t, err := template.New("").Option(useZeroValueForMissingKey).Parse(v) + if err != nil { + return nil, errors.ErrInvalid. + WithMsgf("label template for '%s' is invalid", k).WithCausef(err.Error()) + } else if err := t.Execute(&buf, labelsValues); err != nil { + return nil, errors.ErrInvalid. + WithMsgf("failed to render label template").WithCausef(err.Error()) + } + + // allow empty values + // labelVal := strings.TrimSpace(buf.String()) + // if labelVal == "" { + // continue + // } + + finalLabels[k] = buf.String() + } + return finalLabels, nil +} diff --git a/modules/dagger/driver_output.go b/modules/dagger/driver_output.go new file mode 100644 index 0000000..6b3f387 --- /dev/null +++ b/modules/dagger/driver_output.go @@ -0,0 +1,60 @@ +package dagger + +import ( + "context" + "encoding/json" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/core/resource" + "github.com/goto/entropy/modules" + "github.com/goto/entropy/modules/flink" + "github.com/goto/entropy/modules/kubernetes" + "github.com/goto/entropy/pkg/errors" +) + +func (dd *daggerDriver) Output(ctx context.Context, exr module.ExpandedResource) (json.RawMessage, error) { + output, err := readOutputData(exr) + if err != nil { + return nil, err + } + + conf, err := readConfig(exr, exr.Spec.Configs, dd.conf) + if err != nil { + return nil, errors.ErrInternal.WithCausef(err.Error()) + } + + var flinkOut flink.Output + if err := json.Unmarshal(exr.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil { + return nil, errors.ErrInternal.WithMsgf("invalid kube state").WithCausef(err.Error()) + } + + return dd.refreshOutput(ctx, exr.Resource, *conf, *output, flinkOut.KubeCluster) +} + +func (dd *daggerDriver) refreshOutput(ctx context.Context, r resource.Resource, + conf Config, output Output, kubeOut kubernetes.Output, +) (json.RawMessage, error) { + rc, err := dd.getHelmRelease(r, conf, kubeOut) + if err != nil { + return nil, err + } + + pods, err := dd.kubeGetPod(ctx, kubeOut.Configs, rc.Namespace, map[string]string{"app": conf.DeploymentID}) + if err != nil { + return nil, errors.ErrInternal.WithCausef(err.Error()) + } + output.Pods = pods + output.Namespace = conf.Namespace + output.JobID = conf.DeploymentID + + crd, err := dd.kubeGetCRD(ctx, kubeOut.Configs, rc.Namespace, rc.Name) + if err != nil { + return nil, errors.ErrInternal.WithCausef(err.Error()) + } + + output.JMDeployStatus = crd.JMDeployStatus + output.JobStatus = crd.JobStatus + output.Reconcilation = crd.Reconciliation + + return modules.MustJSON(output), nil +} diff --git a/modules/dagger/driver_plan.go b/modules/dagger/driver_plan.go new file mode 100644 index 0000000..f18d245 --- /dev/null +++ b/modules/dagger/driver_plan.go @@ -0,0 +1,144 @@ +package dagger + +import ( + "context" + "encoding/json" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/core/resource" + "github.com/goto/entropy/modules" + "github.com/goto/entropy/modules/flink" + "github.com/goto/entropy/pkg/errors" +) + +const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET" + +func (dd *daggerDriver) Plan(_ context.Context, exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { + switch act.Name { + case module.CreateAction: + return dd.planCreate(exr, act) + + default: + return dd.planChange(exr, act) + } +} + +func (dd *daggerDriver) planCreate(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { + conf, err := readConfig(exr, act.Params, dd.conf) + if err != nil { + return nil, err + } + + //transformation #12 + conf.EnvVariables[keyStreams] = string(mustMarshalJSON(conf.Source)) + + chartVals, err := mergeChartValues(&dd.conf.ChartValues, conf.ChartValues) + if err != nil { + return nil, err + } + conf.ChartValues = chartVals + + immediately := dd.timeNow() + conf.JarURI = dd.conf.JarURI + + exr.Resource.Spec.Configs = modules.MustJSON(conf) + + err = dd.validateHelmReleaseConfigs(exr, *conf) + if err != nil { + return nil, err + } + + exr.Resource.State = resource.State{ + Status: resource.StatusPending, + Output: modules.MustJSON(Output{ + Namespace: conf.Namespace, + }), + NextSyncAt: &immediately, + ModuleData: modules.MustJSON(transientData{ + PendingSteps: []string{stepReleaseCreate}, + }), + } + + return &exr.Resource, nil +} + +func (dd *daggerDriver) planChange(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { + curConf, err := readConfig(exr, exr.Resource.Spec.Configs, dd.conf) + if err != nil { + return nil, err + } + + switch act.Name { + case module.UpdateAction: + newConf, err := readConfig(exr, act.Params, dd.conf) + if err != nil { + return nil, err + } + + newConf.Source = mergeConsumerGroupId(curConf.Source, newConf.Source) + + chartVals, err := mergeChartValues(curConf.ChartValues, newConf.ChartValues) + if err != nil { + return nil, err + } + + // restore configs that are not user-controlled. + newConf.DeploymentID = curConf.DeploymentID + newConf.ChartValues = chartVals + newConf.JarURI = curConf.JarURI + + newConf.Resources = mergeResources(curConf.Resources, newConf.Resources) + + curConf = newConf + } + + immediately := dd.timeNow() + + exr.Resource.Spec.Configs = modules.MustJSON(curConf) + + err = dd.validateHelmReleaseConfigs(exr, *curConf) + if err != nil { + return nil, err + } + + exr.Resource.State = resource.State{ + Status: resource.StatusPending, + Output: exr.Resource.State.Output, + ModuleData: modules.MustJSON(transientData{ + PendingSteps: []string{stepReleaseUpdate}, + }), + NextSyncAt: &immediately, + } + + return &exr.Resource, nil +} + +func (dd *daggerDriver) validateHelmReleaseConfigs(expandedResource module.ExpandedResource, config Config) error { + var flinkOut flink.Output + if err := json.Unmarshal(expandedResource.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil { + return errors.ErrInternal.WithMsgf("invalid kube state").WithCausef(err.Error()) + } + + _, err := dd.getHelmRelease(expandedResource.Resource, config, flinkOut.KubeCluster) + return err +} + +func mergeConsumerGroupId(currStreams, newStreams []Source) []Source { + if len(currStreams) != len(newStreams) { + return newStreams + } + + for i := range currStreams { + if currStreams[i].SourceParquet.SourceParquetFilePaths != nil && len(currStreams[i].SourceParquet.SourceParquetFilePaths) > 0 { + //source is parquete + //do nothing + continue + } + + if currStreams[i].SourceKafka.SourceKafkaName == newStreams[i].SourceKafka.SourceKafkaName { + newStreams[i].SourceKafka.SourceKafkaConsumerConfigGroupID = currStreams[i].SourceKafka.SourceKafkaConsumerConfigGroupID + } + } + + return newStreams +} diff --git a/modules/dagger/driver_sync.go b/modules/dagger/driver_sync.go new file mode 100644 index 0000000..34885d1 --- /dev/null +++ b/modules/dagger/driver_sync.go @@ -0,0 +1,88 @@ +package dagger + +import ( + "context" + "encoding/json" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/core/resource" + "github.com/goto/entropy/modules" + "github.com/goto/entropy/modules/flink" + "github.com/goto/entropy/modules/kubernetes" + "github.com/goto/entropy/pkg/errors" +) + +func (dd *daggerDriver) Sync(ctx context.Context, exr module.ExpandedResource) (*resource.State, error) { + modData, err := readTransientData(exr) + if err != nil { + return nil, err + } + + out, err := readOutputData(exr) + if err != nil { + return nil, errors.ErrInternal.WithCausef(err.Error()) + } + + conf, err := readConfig(exr, exr.Spec.Configs, dd.conf) + if err != nil { + return nil, errors.ErrInternal.WithCausef(err.Error()) + } + + var flinkOut flink.Output + if err := json.Unmarshal(exr.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil { + return nil, errors.ErrInternal.WithMsgf("invalid flink state").WithCausef(err.Error()) + } + + finalState := resource.State{ + Status: resource.StatusPending, + Output: exr.Resource.State.Output, + } + + if len(modData.PendingSteps) > 0 { + pendingStep := modData.PendingSteps[0] + modData.PendingSteps = modData.PendingSteps[1:] + + switch pendingStep { + case stepReleaseCreate, stepReleaseUpdate: + isCreate := pendingStep == stepReleaseCreate + if err := dd.releaseSync(ctx, exr.Resource, isCreate, *conf, flinkOut.KubeCluster); err != nil { + return nil, err + } + default: + return nil, errors.ErrInternal.WithMsgf("unknown step: '%s'", pendingStep) + } + + // we have more pending states, so enqueue resource for another sync + // as soon as possible. + immediately := dd.timeNow() + finalState.NextSyncAt = &immediately + finalState.ModuleData = modules.MustJSON(modData) + + return &finalState, nil + } + + finalOut, err := dd.refreshOutput(ctx, exr.Resource, *conf, *out, flinkOut.KubeCluster) + if err != nil { + return nil, err + } + finalState.Output = finalOut + + finalState.Status = resource.StatusCompleted + finalState.ModuleData = nil + return &finalState, nil + +} + +func (dd *daggerDriver) releaseSync(ctx context.Context, r resource.Resource, + isCreate bool, conf Config, kubeOut kubernetes.Output, +) error { + rc, err := dd.getHelmRelease(r, conf, kubeOut) + if err != nil { + return err + } + + if err := dd.kubeDeploy(ctx, isCreate, kubeOut.Configs, *rc); err != nil { + return errors.ErrInternal.WithCausef(err.Error()) + } + return nil +} diff --git a/modules/dagger/module.go b/modules/dagger/module.go new file mode 100644 index 0000000..bfc4149 --- /dev/null +++ b/modules/dagger/module.go @@ -0,0 +1,127 @@ +package dagger + +import ( + "context" + _ "embed" + "encoding/json" + "time" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/modules/flink" + "github.com/goto/entropy/pkg/errors" + "github.com/goto/entropy/pkg/helm" + "github.com/goto/entropy/pkg/kube" + "github.com/goto/entropy/pkg/validator" + "helm.sh/helm/v3/pkg/release" + v1 "k8s.io/api/core/v1" +) + +const ( + keyFlinkDependency = "flink" +) + +type FlinkCRDStatus struct { + JobManagerDeploymentStatus string `json:"jobManagerDeploymentStatus"` + JobStatus string `json:"jobStatus"` + ReconciliationStatus string `json:"reconciliationStatus"` +} + +var Module = module.Descriptor{ + Kind: "dagger", + Dependencies: map[string]string{ + keyFlinkDependency: flink.Module.Kind, + }, + Actions: []module.ActionDesc{ + { + Name: module.CreateAction, + Description: "Creates a new dagger", + }, + { + Name: module.UpdateAction, + Description: "Updates an existing dagger", + }, + }, + DriverFactory: func(confJSON json.RawMessage) (module.Driver, error) { + conf := defaultDriverConf // clone the default value + if err := json.Unmarshal(confJSON, &conf); err != nil { + return nil, err + } else if err := validator.TaggedStruct(conf); err != nil { + return nil, err + } + + return &daggerDriver{ + conf: conf, + timeNow: time.Now, + kubeDeploy: func(_ context.Context, isCreate bool, kubeConf kube.Config, hc helm.ReleaseConfig) error { + canUpdate := func(rel *release.Release) bool { + curLabels, ok := rel.Config[labelsConfKey].(map[string]any) + if !ok { + return false + } + newLabels, ok := hc.Values[labelsConfKey].(map[string]string) + if !ok { + return false + } + + isManagedByEntropy := curLabels[labelOrchestrator] == orchestratorLabelValue + isSameDeployment := curLabels[labelDeployment] == newLabels[labelDeployment] + + return isManagedByEntropy && isSameDeployment + } + + helmCl := helm.NewClient(&helm.Config{Kubernetes: kubeConf}) + _, errHelm := helmCl.Upsert(&hc, canUpdate) + return errHelm + }, + kubeGetPod: func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) { + kubeCl, err := kube.NewClient(ctx, conf) + if err != nil { + return nil, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get pod").WithCausef(err.Error()) + } + return kubeCl.GetPodDetails(ctx, ns, labels, func(pod v1.Pod) bool { + // allow pods that are in running state and are not marked for deletion + return pod.Status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil + }) + }, + kubeGetCRD: func(ctx context.Context, conf kube.Config, ns string, name string) (kube.FlinkDeploymentStatus, error) { + kubeCl, err := kube.NewClient(ctx, conf) + if err != nil { + return kube.FlinkDeploymentStatus{}, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get pod").WithCausef(err.Error()) + } + crd, err := kubeCl.GetCRDDetails(ctx, ns, name) + if err != nil { + return kube.FlinkDeploymentStatus{}, err + } + return parseFlinkCRDStatus(crd.Object) + }}, nil + }, +} + +func parseFlinkCRDStatus(flinkDeployment map[string]interface{}) (kube.FlinkDeploymentStatus, error) { + var flinkCRDStatus FlinkCRDStatus + statusInterface, ok := flinkDeployment["status"].(map[string]interface{}) + if !ok { + return kube.FlinkDeploymentStatus{}, errors.ErrInternal.WithMsgf("failed to convert flink deployment status to map[string]interface{}") + } + + if jmStatus, ok := statusInterface["jobManagerDeploymentStatus"].(string); ok { + flinkCRDStatus.JobManagerDeploymentStatus = jmStatus + } + if jobStatus, ok := statusInterface["jobStatus"].(map[string]interface{}); ok { + if st, ok := jobStatus["state"].(string); ok { + flinkCRDStatus.JobStatus = st + } + } + if reconciliationStatus, ok := statusInterface["reconciliationStatus"].(map[string]interface{}); ok { + if st, ok := reconciliationStatus["state"].(string); ok { + flinkCRDStatus.ReconciliationStatus = st + } + } + + status := kube.FlinkDeploymentStatus{ + JMDeployStatus: flinkCRDStatus.JobManagerDeploymentStatus, + JobStatus: flinkCRDStatus.JobStatus, + Reconciliation: flinkCRDStatus.ReconciliationStatus, + } + return status, nil +} diff --git a/modules/dagger/schema/config.json b/modules/dagger/schema/config.json new file mode 100644 index 0000000..3effabd --- /dev/null +++ b/modules/dagger/schema/config.json @@ -0,0 +1,49 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": [ + "replicas", + "env_variables", + "team", + "source", + "sink", + "sink_type" + ], + "properties": { + "replicas": { + "type": "number", + "default": 1, + "minimum": 1 + }, + "deployment_id": { + "type": "string" + }, + "sink_type": { + "type": "string", + "enum": [ + "INFLUX", + "KAFKA", + "BIGQUERY" + ] + }, + "env_variables": { + "type": "object", + "additionalProperties": true, + "required": [ + "SINK_TYPE" + ], + "properties": { + "SINK_TYPE": { + "type": "string", + "enum": [ + "INFLUX", + "KAFKA", + "BIGQUERY" + ] + } + } + } + } + } + \ No newline at end of file diff --git a/modules/flink/config.go b/modules/flink/config.go index 24a06a2..6ea778a 100644 --- a/modules/flink/config.go +++ b/modules/flink/config.go @@ -26,6 +26,8 @@ type Config struct { KubeNamespace string `json:"kube_namespace,omitempty"` Influx Influx `json:"influx,omitempty"` SinkKafkaStream string `json:"sink_kafka_stream,omitempty"` + PrometheusURL string `json:"prometheus_url,omitempty"` + FlinkName string `json:"flink_name,omitempty"` } func readConfig(_ resource.Resource, confJSON json.RawMessage, dc driverConf) (*Config, error) { @@ -48,5 +50,13 @@ func readConfig(_ resource.Resource, confJSON json.RawMessage, dc driverConf) (* cfg.KubeNamespace = dc.KubeNamespace } + if cfg.PrometheusURL == "" { + cfg.PrometheusURL = dc.PrometheusURL + } + + if cfg.FlinkName == "" { + cfg.FlinkName = dc.FlinkName + } + return &cfg, nil } diff --git a/modules/flink/driver.go b/modules/flink/driver.go index 64a5365..9fbb0b2 100644 --- a/modules/flink/driver.go +++ b/modules/flink/driver.go @@ -16,6 +16,8 @@ type driverConf struct { Influx Influx `json:"influx,omitempty"` SinkKafkaStream string `json:"sink_kafka_stream,omitempty"` KubeNamespace string `json:"kube_namespace,omitempty"` + PrometheusURL string `json:"prometheus_url,omitempty"` + FlinkName string `json:"flink_name,omitempty"` } type Output struct { @@ -23,6 +25,8 @@ type Output struct { KubeNamespace string `json:"kube_namespace,omitempty"` Influx Influx `json:"influx,omitempty"` SinkKafkaStream string `json:"sink_kafka_stream,omitempty"` + PrometheusURL string `json:"prometheus_url,omitempty"` + FlinkName string `json:"flink_name,omitempty"` } func readOutputData(exr module.ExpandedResource) (*Output, error) { diff --git a/modules/flink/driver_output.go b/modules/flink/driver_output.go index 2182308..58596a9 100644 --- a/modules/flink/driver_output.go +++ b/modules/flink/driver_output.go @@ -33,6 +33,8 @@ func (fd *flinkDriver) Output(ctx context.Context, exr module.ExpandedResource) output.Influx = conf.Influx output.KubeNamespace = conf.KubeNamespace output.SinkKafkaStream = conf.SinkKafkaStream + output.PrometheusURL = conf.PrometheusURL + output.FlinkName = conf.FlinkName return modules.MustJSON(output), nil } diff --git a/modules/flink/schema/config.json b/modules/flink/schema/config.json index e9e5bd4..a78df50 100644 --- a/modules/flink/schema/config.json +++ b/modules/flink/schema/config.json @@ -20,7 +20,7 @@ } } }, - "sink_kafka_stream_name": { + "sink_kafka_stream": { "type": "string" } } diff --git a/pkg/helm/helm.go b/pkg/helm/helm.go index c032a1f..c06b59f 100644 --- a/pkg/helm/helm.go +++ b/pkg/helm/helm.go @@ -70,20 +70,22 @@ func (p *Client) doCreate(actionConfig *action.Configuration, config *ReleaseCon } act := action.NewInstall(actionConfig) - act.Wait = config.Wait - act.DryRun = false + act.Wait = true + act.IncludeCRDs = true + act.SkipCRDs = false act.Timeout = time.Duration(config.Timeout) * time.Second act.Replace = config.Replace act.OutputDir = "" act.Namespace = config.Namespace act.ClientOnly = false act.Description = config.Description - act.WaitForJobs = config.WaitForJobs + act.WaitForJobs = true act.ReleaseName = config.Name act.GenerateName = false act.NameTemplate = "" act.CreateNamespace = config.CreateNamespace act.ChartPathOptions = *chartPathOpts + act.DryRun = false rel, err := act.Run(fetchedChart, config.Values) if err != nil { diff --git a/pkg/kube/client.go b/pkg/kube/client.go index af9f6fa..b4c15b2 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -15,9 +15,12 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/selection" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" typedbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1" "k8s.io/client-go/rest" @@ -55,6 +58,13 @@ type Pod struct { Status string `json:"status"` } +type FlinkDeploymentStatus struct { + State string `json:"state"` + JMDeployStatus string `json:"jm_deploy_status"` + JobStatus string `json:"job_status"` + Reconciliation string `json:"reconciliation"` +} + type LogOptions struct { App string `mapstructure:"app"` Pod string `mapstructure:"pod"` @@ -323,6 +333,29 @@ func (c Client) GetPodDetails(ctx context.Context, namespace string, labelSelect return podDetails, nil } +func (c Client) GetCRDDetails(ctx context.Context, namespace string, name string) (*unstructured.Unstructured, error) { + // Initialize the dynamic client + dynamicClient, err := dynamic.NewForConfig(&c.restConfig) + if err != nil { + return nil, fmt.Errorf("failed to create dynamic client: %v", err) + } + + // Define the GVR (GroupVersionResource) for the FlinkDeployment CRD + gvr := schema.GroupVersionResource{ + Group: "flink.apache.org", + Version: "v1beta1", + Resource: "flinkdeployments", + } + + // Fetch the FlinkDeployment CRD details + flinkDeployment, err := dynamicClient.Resource(gvr).Namespace(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get FlinkDeployment: %v", err) + } + + return flinkDeployment, nil +} + func streamContainerLogs(ctx context.Context, ns, podName string, logCh chan<- LogChunk, clientSet *kubernetes.Clientset, podLogOpts corev1.PodLogOptions, ) error { diff --git a/test/testbench/test_data/resource/flink_resource.json b/test/testbench/test_data/resource/flink_resource.json index ada2bf2..d68f0a5 100644 --- a/test/testbench/test_data/resource/flink_resource.json +++ b/test/testbench/test_data/resource/flink_resource.json @@ -13,7 +13,7 @@ "username": "influx-user" }, "kube_namespace": "flink-ns", - "sink_kafka_stream_name": "flinkstream" + "sink_kafka_stream": "flinkstream" }, "dependencies": [ { From 2eb0f5684f4103ca9e4dcc419509250cfa366427 Mon Sep 17 00:00:00 2001 From: Ishan Arya <ishanarya0@gmail.com> Date: Mon, 30 Sep 2024 14:55:53 +0530 Subject: [PATCH 7/8] feat: dagger actions (#114) * feat: add dagger module * feat: add flink dep * feat: add transformations * fix: read stream from config root * feat: add Plan implementation * fix: chart values * fix: resolve TODOs and refactored * fix: source sink base handling * feat: Output to have CR details * feat: handle status * refactor: seperate contants by type * refactor: kubeGetCRD function * feat: add dagger update action * fix: add Update action * chore: change var name to sink_kafka_stream * feat: merge consumer group ID if sink is same * feat: add start, stop and reset actions (#112) * feat: add start & update action * feat: add reset action --------- Co-authored-by: Ishan Arya <ishan.arya@gojek.com> --------- Co-authored-by: Ishan Arya <ishan.arya@gojek.com> --- modules/dagger/config.go | 24 +++++++-------- modules/dagger/driver.go | 26 +++++++++------- modules/dagger/driver_plan.go | 57 ++++++++++++++++++++++++++++++++++- modules/dagger/driver_sync.go | 2 +- modules/dagger/module.go | 31 ++++++++++++++++++- 5 files changed, 114 insertions(+), 26 deletions(-) diff --git a/modules/dagger/config.go b/modules/dagger/config.go index d389cda..69eeddf 100644 --- a/modules/dagger/config.go +++ b/modules/dagger/config.go @@ -111,6 +111,9 @@ type Config struct { Namespace string `json:"namespace,omitempty"` PrometheusURL string `json:"prometheus_url,omitempty"` JarURI string `json:"jar_uri,omitempty"` + State string `json:"state"` + JobState string `json:"job_state"` + ResetOffset string `json:"reset_offset"` } type ChartValues struct { @@ -198,19 +201,16 @@ func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverCo //transformation #1 source := cfg.Source - for i := range source { - if source[i].SourceParquet.SourceParquetFilePaths != nil && len(source[i].SourceParquet.SourceParquetFilePaths) > 0 { - //source is parquete - //do nothing - continue + if !(source[0].SourceParquet.SourceParquetFilePaths != nil && len(source[0].SourceParquet.SourceParquetFilePaths) > 0) { + for i := range source { + //TODO: check how to handle increment group id on update + if source[i].SourceKafkaConsumerConfigGroupID == "" { + source[i].SourceKafkaConsumerConfigGroupID = incrementGroupId(r.Name+"-0001", i) + } + source[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable] + source[i].SourceKafkaConsumerConfigAutoOffsetReset = dc.EnvVariables[SourceKafkaConsumerConfigAutoOffsetReset] + source[i].SourceKafkaConsumerConfigBootstrapServers = dc.EnvVariables[SourceKafkaConsumerConfigBootstrapServers] } - //TODO: check how to handle increment group id on update - if source[i].SourceKafkaConsumerConfigGroupID == "" { - source[i].SourceKafkaConsumerConfigGroupID = incrementGroupId(r.Name+"-0001", i) - } - source[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable] - source[i].SourceKafkaConsumerConfigAutoOffsetReset = dc.EnvVariables[SourceKafkaConsumerConfigAutoOffsetReset] - source[i].SourceKafkaConsumerConfigBootstrapServers = dc.EnvVariables[SourceKafkaConsumerConfigBootstrapServers] } cfg.Source = source diff --git a/modules/dagger/driver.go b/modules/dagger/driver.go index 1621dba..2efb06d 100644 --- a/modules/dagger/driver.go +++ b/modules/dagger/driver.go @@ -22,6 +22,8 @@ import ( const ( stepReleaseCreate = "release_create" stepReleaseUpdate = "release_update" + stepReleaseStop = "release_stop" + stepKafkaReset = "kafka_reset" ) const ( @@ -54,17 +56,19 @@ var defaultDriverConf = driverConf{ } type daggerDriver struct { - timeNow func() time.Time - conf driverConf - kubeDeploy kubeDeployFn - kubeGetPod kubeGetPodFn - kubeGetCRD kubeGetCRDFn + timeNow func() time.Time + conf driverConf + kubeDeploy kubeDeployFn + kubeGetPod kubeGetPodFn + kubeGetCRD kubeGetCRDFn + consumerReset consumerResetFn } type ( - kubeDeployFn func(ctx context.Context, isCreate bool, conf kube.Config, hc helm.ReleaseConfig) error - kubeGetPodFn func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) - kubeGetCRDFn func(ctx context.Context, conf kube.Config, ns string, name string) (kube.FlinkDeploymentStatus, error) + kubeDeployFn func(ctx context.Context, isCreate bool, conf kube.Config, hc helm.ReleaseConfig) error + kubeGetPodFn func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) + kubeGetCRDFn func(ctx context.Context, conf kube.Config, ns string, name string) (kube.FlinkDeploymentStatus, error) + consumerResetFn func(ctx context.Context, conf Config, resetTo string) []Source ) type driverConf struct { @@ -88,7 +92,6 @@ type driverConf struct { } type Output struct { - State string `json:"state,omitempty"` JMDeployStatus string `json:"jm_deploy_status,omitempty"` JobStatus string `json:"job_status,omitempty"` Reconcilation string `json:"reconcilation,omitempty"` @@ -98,7 +101,8 @@ type Output struct { } type transientData struct { - PendingSteps []string `json:"pending_steps"` + PendingSteps []string `json:"pending_steps"` + ResetOffsetTo string `json:"reset_offset_to,omitempty"` } func mergeChartValues(cur, newVal *ChartValues) (*ChartValues, error) { @@ -208,7 +212,7 @@ func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config, }, "jarURI": conf.JarURI, "programArgs": append([]string{"--encodedArgs"}, encodedProgramArgs), - "state": "running", + "state": conf.JobState, "namespace": conf.Namespace, } diff --git a/modules/dagger/driver_plan.go b/modules/dagger/driver_plan.go index f18d245..9fd5a96 100644 --- a/modules/dagger/driver_plan.go +++ b/modules/dagger/driver_plan.go @@ -9,15 +9,25 @@ import ( "github.com/goto/entropy/modules" "github.com/goto/entropy/modules/flink" "github.com/goto/entropy/pkg/errors" + "github.com/goto/entropy/pkg/kafka" ) const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET" +const ( + JobStateRunning = "running" + JobStateSuspended = "suspended" + StateDeployed = "DEPLOYED" + StateUserStopped = "USER_STOPPED" +) func (dd *daggerDriver) Plan(_ context.Context, exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { switch act.Name { case module.CreateAction: return dd.planCreate(exr, act) + case ResetAction: + return dd.planReset(exr, act) + default: return dd.planChange(exr, act) } @@ -40,6 +50,8 @@ func (dd *daggerDriver) planCreate(exr module.ExpandedResource, act module.Actio immediately := dd.timeNow() conf.JarURI = dd.conf.JarURI + conf.State = StateDeployed + conf.JobState = JobStateRunning exr.Resource.Spec.Configs = modules.MustJSON(conf) @@ -86,12 +98,22 @@ func (dd *daggerDriver) planChange(exr module.ExpandedResource, act module.Actio newConf.DeploymentID = curConf.DeploymentID newConf.ChartValues = chartVals newConf.JarURI = curConf.JarURI + newConf.State = StateDeployed + newConf.JobState = JobStateRunning newConf.Resources = mergeResources(curConf.Resources, newConf.Resources) curConf = newConf - } + case StopAction: + curConf.State = StateUserStopped + curConf.JobState = JobStateSuspended + + case StartAction: + curConf.State = StateDeployed + curConf.JobState = JobStateRunning + + } immediately := dd.timeNow() exr.Resource.Spec.Configs = modules.MustJSON(curConf) @@ -113,6 +135,39 @@ func (dd *daggerDriver) planChange(exr module.ExpandedResource, act module.Actio return &exr.Resource, nil } +func (dd *daggerDriver) planReset(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { + resetValue, err := kafka.ParseResetV2Params(act.Params) + if err != nil { + return nil, err + } + + immediately := dd.timeNow() + + curConf, err := readConfig(exr, exr.Resource.Spec.Configs, dd.conf) + if err != nil { + return nil, err + } + + curConf.ResetOffset = resetValue + + curConf.Source = dd.consumerReset(context.Background(), *curConf, resetValue) + curConf.EnvVariables[keyStreams] = string(mustMarshalJSON(curConf.Source)) + + exr.Resource.Spec.Configs = modules.MustJSON(curConf) + exr.Resource.State = resource.State{ + Status: resource.StatusPending, + Output: exr.Resource.State.Output, + NextSyncAt: &immediately, + ModuleData: modules.MustJSON(transientData{ + ResetOffsetTo: resetValue, + PendingSteps: []string{ + stepKafkaReset, + }, + }), + } + return &exr.Resource, nil +} + func (dd *daggerDriver) validateHelmReleaseConfigs(expandedResource module.ExpandedResource, config Config) error { var flinkOut flink.Output if err := json.Unmarshal(expandedResource.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil { diff --git a/modules/dagger/driver_sync.go b/modules/dagger/driver_sync.go index 34885d1..f5ebdcd 100644 --- a/modules/dagger/driver_sync.go +++ b/modules/dagger/driver_sync.go @@ -43,7 +43,7 @@ func (dd *daggerDriver) Sync(ctx context.Context, exr module.ExpandedResource) ( modData.PendingSteps = modData.PendingSteps[1:] switch pendingStep { - case stepReleaseCreate, stepReleaseUpdate: + case stepReleaseCreate, stepReleaseUpdate, stepReleaseStop, stepKafkaReset: isCreate := pendingStep == stepReleaseCreate if err := dd.releaseSync(ctx, exr.Resource, isCreate, *conf, flinkOut.KubeCluster); err != nil { return nil, err diff --git a/modules/dagger/module.go b/modules/dagger/module.go index bfc4149..47da1f8 100644 --- a/modules/dagger/module.go +++ b/modules/dagger/module.go @@ -18,6 +18,9 @@ import ( const ( keyFlinkDependency = "flink" + StopAction = "stop" + StartAction = "start" + ResetAction = "reset" ) type FlinkCRDStatus struct { @@ -40,6 +43,18 @@ var Module = module.Descriptor{ Name: module.UpdateAction, Description: "Updates an existing dagger", }, + { + Name: StopAction, + Description: "Suspends a running dagger", + }, + { + Name: StartAction, + Description: "Starts a suspended dagger", + }, + { + Name: ResetAction, + Description: "Resets the offset of a dagger", + }, }, DriverFactory: func(confJSON json.RawMessage) (module.Driver, error) { conf := defaultDriverConf // clone the default value @@ -93,7 +108,9 @@ var Module = module.Descriptor{ return kube.FlinkDeploymentStatus{}, err } return parseFlinkCRDStatus(crd.Object) - }}, nil + }, + consumerReset: consumerReset, + }, nil }, } @@ -125,3 +142,15 @@ func parseFlinkCRDStatus(flinkDeployment map[string]interface{}) (kube.FlinkDepl } return status, nil } + +func consumerReset(ctx context.Context, conf Config, resetTo string) []Source { + baseGroup := conf.Source[0].SourceKafkaConsumerConfigGroupID + groupId := incrementGroupId(baseGroup, len(conf.Source)) + + for i := range conf.Source { + conf.Source[i].SourceKafkaConsumerConfigGroupID = incrementGroupId(groupId, i) + conf.Source[i].SourceKafkaConsumerConfigAutoOffsetReset = resetTo + } + + return conf.Source +} From 9df3622b8fd1d4a6ad494b9a124186725db47d64 Mon Sep 17 00:00:00 2001 From: Ishan Arya <ishan.arya@gojek.com> Date: Mon, 30 Sep 2024 15:16:32 +0530 Subject: [PATCH 8/8] feat: add extra streams --- modules/flink/config.go | 11 ++++++----- modules/flink/driver.go | 1 + modules/flink/driver_output.go | 1 + 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/modules/flink/config.go b/modules/flink/config.go index 6ea778a..c13a65a 100644 --- a/modules/flink/config.go +++ b/modules/flink/config.go @@ -23,11 +23,12 @@ type Influx struct { } type Config struct { - KubeNamespace string `json:"kube_namespace,omitempty"` - Influx Influx `json:"influx,omitempty"` - SinkKafkaStream string `json:"sink_kafka_stream,omitempty"` - PrometheusURL string `json:"prometheus_url,omitempty"` - FlinkName string `json:"flink_name,omitempty"` + KubeNamespace string `json:"kube_namespace,omitempty"` + Influx Influx `json:"influx,omitempty"` + SinkKafkaStream string `json:"sink_kafka_stream,omitempty"` + PrometheusURL string `json:"prometheus_url,omitempty"` + FlinkName string `json:"flink_name,omitempty"` + ExtraStreams []string `json:"extra_streams,omitempty"` } func readConfig(_ resource.Resource, confJSON json.RawMessage, dc driverConf) (*Config, error) { diff --git a/modules/flink/driver.go b/modules/flink/driver.go index 9fbb0b2..9f20a2c 100644 --- a/modules/flink/driver.go +++ b/modules/flink/driver.go @@ -27,6 +27,7 @@ type Output struct { SinkKafkaStream string `json:"sink_kafka_stream,omitempty"` PrometheusURL string `json:"prometheus_url,omitempty"` FlinkName string `json:"flink_name,omitempty"` + ExtraStreams []string `json:"extra_streams,omitempty"` } func readOutputData(exr module.ExpandedResource) (*Output, error) { diff --git a/modules/flink/driver_output.go b/modules/flink/driver_output.go index 58596a9..4c413e2 100644 --- a/modules/flink/driver_output.go +++ b/modules/flink/driver_output.go @@ -35,6 +35,7 @@ func (fd *flinkDriver) Output(ctx context.Context, exr module.ExpandedResource) output.SinkKafkaStream = conf.SinkKafkaStream output.PrometheusURL = conf.PrometheusURL output.FlinkName = conf.FlinkName + output.ExtraStreams = conf.ExtraStreams return modules.MustJSON(output), nil }