From 0a925bf8da1a9c98e30e4e6edd3b5bb4e89910df Mon Sep 17 00:00:00 2001 From: Ishan Arya Date: Mon, 30 Sep 2024 14:32:01 +0530 Subject: [PATCH] feat: add dagger module create & update action (#107) * feat: add dagger module * feat: add flink dep * feat: add transformations * fix: read stream from config root * feat: add Plan implementation * fix: chart values * fix: resolve TODOs and refactored * fix: source sink base handling * feat: Output to have CR details * feat: handle status * refactor: seperate contants by type * refactor: kubeGetCRD function * feat: add dagger update action * fix: add Update action * chore: change var name to sink_kafka_stream * feat: merge consumer group ID if sink is same --------- Co-authored-by: Ishan Arya --- cli/serve.go | 2 + modules/dagger/config.go | 349 ++++++++++++++++++ modules/dagger/driver.go | 243 ++++++++++++ modules/dagger/driver_output.go | 60 +++ modules/dagger/driver_plan.go | 144 ++++++++ modules/dagger/driver_sync.go | 88 +++++ modules/dagger/module.go | 127 +++++++ modules/dagger/schema/config.json | 49 +++ modules/flink/config.go | 10 + modules/flink/driver.go | 4 + modules/flink/driver_output.go | 2 + modules/flink/schema/config.json | 2 +- pkg/helm/helm.go | 8 +- pkg/kube/client.go | 33 ++ .../test_data/resource/flink_resource.json | 2 +- 15 files changed, 1118 insertions(+), 5 deletions(-) create mode 100644 modules/dagger/config.go create mode 100644 modules/dagger/driver.go create mode 100644 modules/dagger/driver_output.go create mode 100644 modules/dagger/driver_plan.go create mode 100644 modules/dagger/driver_sync.go create mode 100644 modules/dagger/module.go create mode 100644 modules/dagger/schema/config.json diff --git a/cli/serve.go b/cli/serve.go index 12242013..095f33bd 100644 --- a/cli/serve.go +++ b/cli/serve.go @@ -14,6 +14,7 @@ import ( entropyserver "github.com/goto/entropy/internal/server" "github.com/goto/entropy/internal/store/postgres" "github.com/goto/entropy/modules" + "github.com/goto/entropy/modules/dagger" "github.com/goto/entropy/modules/firehose" "github.com/goto/entropy/modules/flink" "github.com/goto/entropy/modules/job" @@ -94,6 +95,7 @@ func setupRegistry() module.Registry { job.Module, kafka.Module, flink.Module, + dagger.Module, } registry := &modules.Registry{} diff --git a/modules/dagger/config.go b/modules/dagger/config.go new file mode 100644 index 00000000..d389cda7 --- /dev/null +++ b/modules/dagger/config.go @@ -0,0 +1,349 @@ +package dagger + +import ( + _ "embed" + "encoding/json" + "fmt" + "strconv" + "strings" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/modules" + "github.com/goto/entropy/modules/flink" + "github.com/goto/entropy/pkg/errors" + "github.com/goto/entropy/pkg/validator" +) + +const ( + helmReleaseNameMaxLength = 53 +) + +// Stream-related constants +const ( + keyStreams = "STREAMS" + keySinkType = "SINK_TYPE" +) + +// Flink-related constants +const ( + keyFlinkJobID = "FLINK_JOB_ID" +) + +// Influx-related constants +const ( + keySinkInfluxURL = "SINK_INFLUX_URL" + keySinkInfluxPassword = "SINK_INFLUX_PASSWORD" + keySinkInfluxDBName = "SINK_INFLUX_DB_NAME" + keySinkInfluxUsername = "SINK_INFLUX_USERNAME" + keySinkInfluxMeasurementName = "SINK_INFLUX_MEASUREMENT_NAME" +) + +// Kafka-related constants +const ( + SourceKafkaConsumerConfigAutoCommitEnable = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE" + SourceKafkaConsumerConfigAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET" + SourceKafkaConsumerConfigBootstrapServers = "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS" + keySinkKafkaBrokers = "SINK_KAFKA_BROKERS" + keySinkKafkaStream = "SINK_KAFKA_STREAM" + keySinkKafkaProtoMsg = "SINK_KAFKA_PROTO_MESSAGE" + keySinkKafkaTopic = "SINK_KAFKA_TOPIC" + keySinkKafkaKey = "SINK_KAFKA_PROTO_KEY" + keySinkKafkaLingerMs = "SINK_KAFKA_LINGER_MS" +) + +// Sink types +const ( + SinkTypeInflux = "INFLUX" + SinkTypeKafka = "KAFKA" + SinkTypeBigquery = "BIGQUERY" +) + +// BigQuery-related constants +const ( + keySinkBigqueryGoogleCloudProjectID = "SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID" + keySinkBigqueryDatasetName = "SINK_BIGQUERY_DATASET_NAME" + keySinkBigqueryTableName = "SINK_BIGQUERY_TABLE_NAME" + keySinkBigqueryDatasetLabels = "SINK_BIGQUERY_DATASET_LABELS" + keySinkBigqueryTableLabels = "SINK_BIGQUERY_TABLE_LABELS" + keySinkBigqueryTablePartitioningEnable = "SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE" + keySinkBigqueryTableClusteringEnable = "SINK_BIGQUERY_TABLE_CLUSTERING_ENABLE" + keySinkBigqueryBatchSize = "SINK_BIGQUERY_BATCH_SIZE" + keySinkBigqueryTablePartitionKey = "SINK_BIGQUERY_TABLE_PARTITION_KEY" + keySinkBigqueryRowInsertIDEnable = "SINK_BIGQUERY_ROW_INSERT_ID_ENABLE" + keySinkBigqueryClientReadTimeoutMs = "SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS" + keySinkBigqueryClientConnectTimeoutMs = "SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS" + keySinkBigqueryTablePartitionExpiryMs = "SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS" + keySinkBigqueryDatasetLocation = "SINK_BIGQUERY_DATASET_LOCATION" + keySinkErrorTypesForFailure = "SINK_ERROR_TYPES_FOR_FAILURE" + keySinkBigqueryTableClusteringKeys = "SINK_BIGQUERY_TABLE_CLUSTERING_KEYS" +) + +var ( + //go:embed schema/config.json + configSchemaRaw []byte + + validateConfig = validator.FromJSONSchema(configSchemaRaw) +) + +type UsageSpec struct { + CPU string `json:"cpu,omitempty" validate:"required"` + Memory string `json:"memory,omitempty" validate:"required"` +} + +type Resources struct { + TaskManager UsageSpec `json:"taskmanager,omitempty"` + JobManager UsageSpec `json:"jobmanager,omitempty"` +} + +type Config struct { + Resources Resources `json:"resources,omitempty"` + Source []Source `json:"source,omitempty"` + Sink Sink `json:"sink,omitempty"` + EnvVariables map[string]string `json:"env_variables,omitempty"` + Replicas int `json:"replicas"` + SinkType string `json:"sink_type"` + Team string `json:"team"` + FlinkName string `json:"flink_name,omitempty"` + DeploymentID string `json:"deployment_id,omitempty"` + Savepoint any `json:"savepoint,omitempty"` + ChartValues *ChartValues `json:"chart_values,omitempty"` + Deleted bool `json:"deleted,omitempty"` + Namespace string `json:"namespace,omitempty"` + PrometheusURL string `json:"prometheus_url,omitempty"` + JarURI string `json:"jar_uri,omitempty"` +} + +type ChartValues struct { + ImageRepository string `json:"image_repository" validate:"required"` + ImageTag string `json:"image_tag" validate:"required"` + ChartVersion string `json:"chart_version" validate:"required"` + ImagePullPolicy string `json:"image_pull_policy"` +} + +type SourceDetail struct { + SourceName string `json:"SOURCE_NAME"` + SourceType string `json:"SOURCE_TYPE"` +} + +type SourceKafka struct { + SourceKafkaConsumerConfigAutoCommitEnable string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE"` + SourceKafkaConsumerConfigAutoOffsetReset string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"` + SourceKafkaTopicNames string `json:"SOURCE_KAFKA_TOPIC_NAMES"` + SourceKafkaName string `json:"SOURCE_KAFKA_NAME"` + SourceKafkaConsumerConfigGroupID string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID"` + SourceKafkaConsumerConfigBootstrapServers string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS"` +} + +type SourceParquet struct { + SourceParquetFileDateRange interface{} `json:"SOURCE_PARQUET_FILE_DATE_RANGE"` + SourceParquetFilePaths []string `json:"SOURCE_PARQUET_FILE_PATHS"` +} + +type Source struct { + InputSchemaProtoClass string `json:"INPUT_SCHEMA_PROTO_CLASS"` + InputSchemaEventTimestampFieldIndex string `json:"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX"` + SourceDetails []SourceDetail `json:"SOURCE_DETAILS"` + InputSchemaTable string `json:"INPUT_SCHEMA_TABLE"` + SourceKafka + SourceParquet +} + +type SinkKafka struct { + SinkKafkaBrokers string `json:"SINK_KAFKA_BROKERS"` + SinkKafkaStream string `json:"SINK_KAFKA_STREAM"` + SinkKafkaTopic string `json:"SINK_KAFKA_TOPIC"` + SinkKafkaProtoMsg string `json:"SINK_KAFKA_PROTO_MESSAGE"` + SinkKafkaLingerMs string `json:"SINK_KAFKA_LINGER_MS"` + SinkKafkaProtoKey string `json:"SINK_KAFKA_PROTO_KEY"` +} + +type SinkInflux struct { + SinkInfluxDBName string `json:"SINK_INFLUX_DB_NAME"` + SinkInfluxMeasurementName string `json:"SINK_INFLUX_MEASUREMENT_NAME"` +} + +type SinkBigquery struct { + SinkBigqueryGoogleCloudProjectID string `json:"SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID"` + SinkBigqueryTableName string `json:"SINK_BIGQUERY_TABLE_NAME"` + SinkBigqueryDatasetLabels string `json:"SINK_BIGQUERY_DATASET_LABELS"` + SinkBigqueryTableLabels string `json:"SINK_BIGQUERY_TABLE_LABELS"` + SinkBigqueryDatasetName string `json:"SINK_BIGQUERY_DATASET_NAME"` + SinkBigqueryTablePartitioningEnable string `json:"SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE"` + SinkBigqueryTablePartitionKey string `json:"SINK_BIGQUERY_TABLE_PARTITION_KEY"` + SinkBigqueryRowInsertIDEnable string `json:"SINK_BIGQUERY_ROW_INSERT_ID_ENABLE"` + SinkBigqueryClientReadTimeoutMs string `json:"SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS"` + SinkBigqueryClientConnectTimeoutMs string `json:"SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS"` + SinkBigqueryTablePartitionExpiryMs string `json:"SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS"` + SinkBigqueryDatasetLocation string `json:"SINK_BIGQUERY_DATASET_LOCATION"` + SinkBigqueryBatchSize string `json:"SINK_BIGQUERY_BATCH_SIZE"` + SinkBigqueryTableClusteringEnable string `json:"SINK_BIGQUERY_TABLE_CLUSTERING_ENABLE"` + SinkBigqueryTableClusteringKeys string `json:"SINK_BIGQUERY_TABLE_CLUSTERING_KEYS"` + SinkErrorTypesForFailure string `json:"SINK_ERROR_TYPES_FOR_FAILURE"` +} + +type Sink struct { + SinkKafka + SinkInflux + SinkBigquery +} + +func readConfig(r module.ExpandedResource, confJSON json.RawMessage, dc driverConf) (*Config, error) { + var cfg Config + err := json.Unmarshal(confJSON, &cfg) + if err != nil { + return nil, errors.ErrInvalid.WithMsgf("invalid config json").WithCausef(err.Error()) + } + + //transformation #9 and #11 + //transformation #1 + source := cfg.Source + + for i := range source { + if source[i].SourceParquet.SourceParquetFilePaths != nil && len(source[i].SourceParquet.SourceParquetFilePaths) > 0 { + //source is parquete + //do nothing + continue + } + //TODO: check how to handle increment group id on update + if source[i].SourceKafkaConsumerConfigGroupID == "" { + source[i].SourceKafkaConsumerConfigGroupID = incrementGroupId(r.Name+"-0001", i) + } + source[i].SourceKafkaConsumerConfigAutoCommitEnable = dc.EnvVariables[SourceKafkaConsumerConfigAutoCommitEnable] + source[i].SourceKafkaConsumerConfigAutoOffsetReset = dc.EnvVariables[SourceKafkaConsumerConfigAutoOffsetReset] + source[i].SourceKafkaConsumerConfigBootstrapServers = dc.EnvVariables[SourceKafkaConsumerConfigBootstrapServers] + } + + cfg.Source = source + + //transformation #2 + cfg.EnvVariables = modules.CloneAndMergeMaps(dc.EnvVariables, cfg.EnvVariables) + + //transformation #3 + var flinkOut flink.Output + if err := json.Unmarshal(r.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil { + return nil, errors.ErrInternal.WithMsgf("invalid flink state").WithCausef(err.Error()) + } + + cfg.Namespace = flinkOut.KubeNamespace + + //transformation #4 + //transform resource name to safe length + + //transformation #5 + //TODO: build name from title as project--dagger + cfg.EnvVariables[keyFlinkJobID] = r.Name + + //transformation #6 + // note: enforce the kubernetes deployment name length limit. + if len(cfg.DeploymentID) == 0 { + cfg.DeploymentID = modules.SafeName(fmt.Sprintf("%s-%s", r.Project, r.Name), "-dagger", helmReleaseNameMaxLength) + } else if len(cfg.DeploymentID) > helmReleaseNameMaxLength { + return nil, errors.ErrInvalid.WithMsgf("deployment_id must not have more than 53 chars") + } + + //transformation #7 + cfg.EnvVariables[keySinkInfluxURL] = flinkOut.Influx.URL + cfg.EnvVariables[keySinkInfluxPassword] = flinkOut.Influx.Password + cfg.EnvVariables[keySinkInfluxUsername] = flinkOut.Influx.Username + //SINK_INFLUX_DB_NAME is added by client + //SINK_INFLUX_MEASUREMENT_NAME is added by client + //REDIS_SERVER is skipped + + //transformation #8 + //Longbow configs would be in base configs + + //transformation #10 + //this shall check if the project of the conf.EnvVars.STREAMS is same as that of the corresponding flink + //do we need to check this? + + //transformation #13 + cfg.EnvVariables[keySinkType] = cfg.SinkType + if cfg.SinkType == SinkTypeKafka { + cfg.EnvVariables[keySinkKafkaStream] = cfg.Sink.SinkKafka.SinkKafkaStream + cfg.EnvVariables[keySinkKafkaBrokers] = cfg.Sink.SinkKafka.SinkKafkaBrokers + cfg.EnvVariables[keySinkKafkaProtoMsg] = cfg.Sink.SinkKafka.SinkKafkaProtoMsg + cfg.EnvVariables[keySinkKafkaTopic] = cfg.Sink.SinkKafka.SinkKafkaTopic + cfg.EnvVariables[keySinkKafkaKey] = cfg.Sink.SinkKafka.SinkKafkaProtoKey + cfg.EnvVariables[keySinkKafkaLingerMs] = cfg.Sink.SinkKafka.SinkKafkaLingerMs + } else if cfg.SinkType == SinkTypeInflux { + cfg.EnvVariables[keySinkInfluxDBName] = cfg.Sink.SinkInflux.SinkInfluxDBName + cfg.EnvVariables[keySinkInfluxMeasurementName] = cfg.Sink.SinkInflux.SinkInfluxMeasurementName + } else if cfg.SinkType == SinkTypeBigquery { + cfg.EnvVariables[keySinkBigqueryGoogleCloudProjectID] = cfg.Sink.SinkBigquery.SinkBigqueryGoogleCloudProjectID + cfg.EnvVariables[keySinkBigqueryDatasetName] = cfg.Sink.SinkBigquery.SinkBigqueryDatasetName + cfg.EnvVariables[keySinkBigqueryTableName] = cfg.Sink.SinkBigquery.SinkBigqueryTableName + cfg.EnvVariables[keySinkBigqueryDatasetLabels] = cfg.Sink.SinkBigquery.SinkBigqueryDatasetLabels + cfg.EnvVariables[keySinkBigqueryTableLabels] = cfg.Sink.SinkBigquery.SinkBigqueryTableLabels + cfg.EnvVariables[keySinkBigqueryTablePartitioningEnable] = cfg.Sink.SinkBigquery.SinkBigqueryTablePartitioningEnable + cfg.EnvVariables[keySinkBigqueryTablePartitionKey] = cfg.Sink.SinkBigquery.SinkBigqueryTablePartitionKey + cfg.EnvVariables[keySinkBigqueryRowInsertIDEnable] = cfg.Sink.SinkBigquery.SinkBigqueryRowInsertIDEnable + cfg.EnvVariables[keySinkBigqueryClientReadTimeoutMs] = cfg.Sink.SinkBigquery.SinkBigqueryClientReadTimeoutMs + cfg.EnvVariables[keySinkBigqueryClientConnectTimeoutMs] = cfg.Sink.SinkBigquery.SinkBigqueryClientConnectTimeoutMs + cfg.EnvVariables[keySinkBigqueryTablePartitionExpiryMs] = cfg.Sink.SinkBigquery.SinkBigqueryTablePartitionExpiryMs + cfg.EnvVariables[keySinkBigqueryDatasetLocation] = cfg.Sink.SinkBigquery.SinkBigqueryDatasetLocation + cfg.EnvVariables[keySinkBigqueryBatchSize] = cfg.Sink.SinkBigquery.SinkBigqueryBatchSize + cfg.EnvVariables[keySinkBigqueryTableClusteringEnable] = cfg.Sink.SinkBigquery.SinkBigqueryTableClusteringEnable + cfg.EnvVariables[keySinkBigqueryTableClusteringKeys] = cfg.Sink.SinkBigquery.SinkBigqueryTableClusteringKeys + cfg.EnvVariables[keySinkErrorTypesForFailure] = cfg.Sink.SinkBigquery.SinkErrorTypesForFailure + } + + //transformation #14 + cfg.Resources = mergeResources(dc.Resources, cfg.Resources) + + cfg.PrometheusURL = flinkOut.PrometheusURL + cfg.FlinkName = flinkOut.FlinkName + + if cfg.Replicas <= 0 { + cfg.Replicas = 1 + } + + if err := validateConfig(confJSON); err != nil { + return nil, err + } + + return &cfg, nil +} + +func incrementGroupId(groupId string, step int) string { + incrementNumberInString := func(number string) int { + num, _ := strconv.Atoi(number) + return num + step + } + + leftZeroPad := func(number int) string { + return fmt.Sprintf("%04d", number) + } + + getLastAndRestFromArray := func(arr []string) ([]string, string) { + return arr[:len(arr)-1], arr[len(arr)-1] + } + + parts := strings.Split(groupId, "-") + name, number := getLastAndRestFromArray(parts) + updatedNumber := leftZeroPad(incrementNumberInString(number)) + return strings.Join(append(name, updatedNumber), "-") +} + +func mustMarshalJSON(v interface{}) []byte { + data, err := json.Marshal(v) + if err != nil { + panic(fmt.Sprintf("failed to marshal JSON: %v", err)) + } + return data +} + +func mergeResources(oldResources, newResources Resources) Resources { + if newResources.TaskManager.CPU == "" { + newResources.TaskManager.CPU = oldResources.TaskManager.CPU + } + if newResources.TaskManager.Memory == "" { + newResources.TaskManager.Memory = oldResources.TaskManager.Memory + } + if newResources.JobManager.CPU == "" { + newResources.JobManager.CPU = oldResources.JobManager.CPU + } + if newResources.JobManager.Memory == "" { + newResources.JobManager.Memory = oldResources.JobManager.Memory + } + return newResources +} diff --git a/modules/dagger/driver.go b/modules/dagger/driver.go new file mode 100644 index 00000000..1621dbac --- /dev/null +++ b/modules/dagger/driver.go @@ -0,0 +1,243 @@ +package dagger + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "html/template" + "strings" + "time" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/core/resource" + "github.com/goto/entropy/modules" + "github.com/goto/entropy/modules/kubernetes" + "github.com/goto/entropy/pkg/errors" + "github.com/goto/entropy/pkg/helm" + "github.com/goto/entropy/pkg/kube" +) + +const ( + stepReleaseCreate = "release_create" + stepReleaseUpdate = "release_update" +) + +const ( + chartRepo = "https://goto.github.io/charts/" + chartName = "dagger-deployment-chart" + imageRepo = "gotocompany/dagger" +) + +const ( + labelsConfKey = "extra_labels" + + labelDeployment = "deployment" + labelOrchestrator = "orchestrator" + labelURN = "urn" + labelName = "name" + labelNamespace = "namespace" + + orchestratorLabelValue = "entropy" +) + +const defaultKey = "default" + +var defaultDriverConf = driverConf{ + Namespace: map[string]string{ + defaultKey: "dagger", + }, + ChartValues: ChartValues{ + ChartVersion: "0.1.0", + }, +} + +type daggerDriver struct { + timeNow func() time.Time + conf driverConf + kubeDeploy kubeDeployFn + kubeGetPod kubeGetPodFn + kubeGetCRD kubeGetCRDFn +} + +type ( + kubeDeployFn func(ctx context.Context, isCreate bool, conf kube.Config, hc helm.ReleaseConfig) error + kubeGetPodFn func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) + kubeGetCRDFn func(ctx context.Context, conf kube.Config, ns string, name string) (kube.FlinkDeploymentStatus, error) +) + +type driverConf struct { + // Labels to be injected to the chart during deployment. Values can be Go templates. + Labels map[string]string `json:"labels,omitempty"` + + // Namespace is the kubernetes namespace where firehoses will be deployed. + Namespace map[string]string `json:"namespace" validate:"required"` + + // ChartValues is the chart and image version information. + ChartValues ChartValues `json:"chart_values" validate:"required"` + + EnvVariables map[string]string `json:"env_variables,omitempty"` + + Resources Resources `json:"resources" validate:"required"` + + JarURI string `json:"jar_uri" validate:"required"` + + // timeout value for a kube deployment run + KubeDeployTimeout int `json:"kube_deploy_timeout_seconds"` +} + +type Output struct { + State string `json:"state,omitempty"` + JMDeployStatus string `json:"jm_deploy_status,omitempty"` + JobStatus string `json:"job_status,omitempty"` + Reconcilation string `json:"reconcilation,omitempty"` + Pods []kube.Pod `json:"pods,omitempty"` + Namespace string `json:"namespace,omitempty"` + JobID string `json:"job_id,omitempty"` +} + +type transientData struct { + PendingSteps []string `json:"pending_steps"` +} + +func mergeChartValues(cur, newVal *ChartValues) (*ChartValues, error) { + if newVal == nil { + return cur, nil + } + + merged := ChartValues{ + ChartVersion: newVal.ChartVersion, + } + + return &merged, nil +} + +func readOutputData(exr module.ExpandedResource) (*Output, error) { + var curOut Output + if len(exr.Resource.State.Output) == 0 { + return &curOut, nil + } + if err := json.Unmarshal(exr.Resource.State.Output, &curOut); err != nil { + return nil, errors.ErrInternal.WithMsgf("corrupted output").WithCausef(err.Error()) + } + return &curOut, nil +} + +func readTransientData(exr module.ExpandedResource) (*transientData, error) { + if len(exr.Resource.State.ModuleData) == 0 { + return &transientData{}, nil + } + + var modData transientData + if err := json.Unmarshal(exr.Resource.State.ModuleData, &modData); err != nil { + return nil, errors.ErrInternal.WithMsgf("corrupted transient data").WithCausef(err.Error()) + } + return &modData, nil +} + +func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config, + kubeOut kubernetes.Output, +) (*helm.ReleaseConfig, error) { + + entropyLabels := map[string]string{ + labelDeployment: conf.DeploymentID, + labelOrchestrator: orchestratorLabelValue, + } + + otherLabels := map[string]string{ + labelURN: res.URN, + labelName: res.Name, + labelNamespace: conf.Namespace, + } + + deploymentLabels, err := renderTpl(dd.conf.Labels, modules.CloneAndMergeMaps(res.Labels, modules.CloneAndMergeMaps(entropyLabels, otherLabels))) + if err != nil { + return nil, err + } + + rc := helm.DefaultReleaseConfig() + rc.Timeout = dd.conf.KubeDeployTimeout + rc.Name = conf.DeploymentID + rc.Repository = chartRepo + rc.Chart = chartName + rc.Namespace = conf.Namespace + rc.ForceUpdate = true + rc.Version = conf.ChartValues.ChartVersion + + imageRepository := dd.conf.ChartValues.ImageRepository + if conf.ChartValues.ImageRepository != "" { + imageRepository = conf.ChartValues.ImageRepository + } + + var programArgs []string + for key, value := range conf.EnvVariables { + // Check if the value is a JSON object and escape quotes if necessary + if json.Valid([]byte(value)) { + value = strings.ReplaceAll(value, `"`, `\"`) + } + programArgs = append(programArgs, fmt.Sprintf("\"%s\"", "--"+key), fmt.Sprintf("\"%v\"", value)) + } + + //fmt.Printf("programArgs: %v\n", programArgs) + formatted := fmt.Sprintf("[%s]", strings.Join(programArgs, ",")) + //fmt.Printf("formatted: %v\n", formatted) + encodedProgramArgs := base64.StdEncoding.EncodeToString([]byte(formatted)) + + rc.Values = map[string]any{ + labelsConfKey: modules.CloneAndMergeMaps(deploymentLabels, entropyLabels), + "image": imageRepository, + "deployment_id": conf.DeploymentID, + "configuration": map[string]any{ + "FLINK_PARALLELISM": conf.Replicas, + }, + "projectID": res.Project, + "name": res.Name, + "team": conf.Team, + "flink_name": conf.FlinkName, + "prometheus_url": conf.PrometheusURL, + "resources": map[string]any{ + "jobmanager": map[string]any{ + "cpu": conf.Resources.JobManager.CPU, + "memory": conf.Resources.JobManager.Memory, + }, + "taskmanager": map[string]any{ + "cpu": conf.Resources.TaskManager.CPU, + "memory": conf.Resources.JobManager.Memory, + }, + }, + "jarURI": conf.JarURI, + "programArgs": append([]string{"--encodedArgs"}, encodedProgramArgs), + "state": "running", + "namespace": conf.Namespace, + } + + return rc, nil +} + +// TODO: move this to pkg +func renderTpl(labelsTpl map[string]string, labelsValues map[string]string) (map[string]string, error) { + const useZeroValueForMissingKey = "missingkey=zero" + + finalLabels := map[string]string{} + for k, v := range labelsTpl { + var buf bytes.Buffer + t, err := template.New("").Option(useZeroValueForMissingKey).Parse(v) + if err != nil { + return nil, errors.ErrInvalid. + WithMsgf("label template for '%s' is invalid", k).WithCausef(err.Error()) + } else if err := t.Execute(&buf, labelsValues); err != nil { + return nil, errors.ErrInvalid. + WithMsgf("failed to render label template").WithCausef(err.Error()) + } + + // allow empty values + // labelVal := strings.TrimSpace(buf.String()) + // if labelVal == "" { + // continue + // } + + finalLabels[k] = buf.String() + } + return finalLabels, nil +} diff --git a/modules/dagger/driver_output.go b/modules/dagger/driver_output.go new file mode 100644 index 00000000..6b3f3873 --- /dev/null +++ b/modules/dagger/driver_output.go @@ -0,0 +1,60 @@ +package dagger + +import ( + "context" + "encoding/json" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/core/resource" + "github.com/goto/entropy/modules" + "github.com/goto/entropy/modules/flink" + "github.com/goto/entropy/modules/kubernetes" + "github.com/goto/entropy/pkg/errors" +) + +func (dd *daggerDriver) Output(ctx context.Context, exr module.ExpandedResource) (json.RawMessage, error) { + output, err := readOutputData(exr) + if err != nil { + return nil, err + } + + conf, err := readConfig(exr, exr.Spec.Configs, dd.conf) + if err != nil { + return nil, errors.ErrInternal.WithCausef(err.Error()) + } + + var flinkOut flink.Output + if err := json.Unmarshal(exr.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil { + return nil, errors.ErrInternal.WithMsgf("invalid kube state").WithCausef(err.Error()) + } + + return dd.refreshOutput(ctx, exr.Resource, *conf, *output, flinkOut.KubeCluster) +} + +func (dd *daggerDriver) refreshOutput(ctx context.Context, r resource.Resource, + conf Config, output Output, kubeOut kubernetes.Output, +) (json.RawMessage, error) { + rc, err := dd.getHelmRelease(r, conf, kubeOut) + if err != nil { + return nil, err + } + + pods, err := dd.kubeGetPod(ctx, kubeOut.Configs, rc.Namespace, map[string]string{"app": conf.DeploymentID}) + if err != nil { + return nil, errors.ErrInternal.WithCausef(err.Error()) + } + output.Pods = pods + output.Namespace = conf.Namespace + output.JobID = conf.DeploymentID + + crd, err := dd.kubeGetCRD(ctx, kubeOut.Configs, rc.Namespace, rc.Name) + if err != nil { + return nil, errors.ErrInternal.WithCausef(err.Error()) + } + + output.JMDeployStatus = crd.JMDeployStatus + output.JobStatus = crd.JobStatus + output.Reconcilation = crd.Reconciliation + + return modules.MustJSON(output), nil +} diff --git a/modules/dagger/driver_plan.go b/modules/dagger/driver_plan.go new file mode 100644 index 00000000..f18d245a --- /dev/null +++ b/modules/dagger/driver_plan.go @@ -0,0 +1,144 @@ +package dagger + +import ( + "context" + "encoding/json" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/core/resource" + "github.com/goto/entropy/modules" + "github.com/goto/entropy/modules/flink" + "github.com/goto/entropy/pkg/errors" +) + +const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET" + +func (dd *daggerDriver) Plan(_ context.Context, exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { + switch act.Name { + case module.CreateAction: + return dd.planCreate(exr, act) + + default: + return dd.planChange(exr, act) + } +} + +func (dd *daggerDriver) planCreate(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { + conf, err := readConfig(exr, act.Params, dd.conf) + if err != nil { + return nil, err + } + + //transformation #12 + conf.EnvVariables[keyStreams] = string(mustMarshalJSON(conf.Source)) + + chartVals, err := mergeChartValues(&dd.conf.ChartValues, conf.ChartValues) + if err != nil { + return nil, err + } + conf.ChartValues = chartVals + + immediately := dd.timeNow() + conf.JarURI = dd.conf.JarURI + + exr.Resource.Spec.Configs = modules.MustJSON(conf) + + err = dd.validateHelmReleaseConfigs(exr, *conf) + if err != nil { + return nil, err + } + + exr.Resource.State = resource.State{ + Status: resource.StatusPending, + Output: modules.MustJSON(Output{ + Namespace: conf.Namespace, + }), + NextSyncAt: &immediately, + ModuleData: modules.MustJSON(transientData{ + PendingSteps: []string{stepReleaseCreate}, + }), + } + + return &exr.Resource, nil +} + +func (dd *daggerDriver) planChange(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { + curConf, err := readConfig(exr, exr.Resource.Spec.Configs, dd.conf) + if err != nil { + return nil, err + } + + switch act.Name { + case module.UpdateAction: + newConf, err := readConfig(exr, act.Params, dd.conf) + if err != nil { + return nil, err + } + + newConf.Source = mergeConsumerGroupId(curConf.Source, newConf.Source) + + chartVals, err := mergeChartValues(curConf.ChartValues, newConf.ChartValues) + if err != nil { + return nil, err + } + + // restore configs that are not user-controlled. + newConf.DeploymentID = curConf.DeploymentID + newConf.ChartValues = chartVals + newConf.JarURI = curConf.JarURI + + newConf.Resources = mergeResources(curConf.Resources, newConf.Resources) + + curConf = newConf + } + + immediately := dd.timeNow() + + exr.Resource.Spec.Configs = modules.MustJSON(curConf) + + err = dd.validateHelmReleaseConfigs(exr, *curConf) + if err != nil { + return nil, err + } + + exr.Resource.State = resource.State{ + Status: resource.StatusPending, + Output: exr.Resource.State.Output, + ModuleData: modules.MustJSON(transientData{ + PendingSteps: []string{stepReleaseUpdate}, + }), + NextSyncAt: &immediately, + } + + return &exr.Resource, nil +} + +func (dd *daggerDriver) validateHelmReleaseConfigs(expandedResource module.ExpandedResource, config Config) error { + var flinkOut flink.Output + if err := json.Unmarshal(expandedResource.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil { + return errors.ErrInternal.WithMsgf("invalid kube state").WithCausef(err.Error()) + } + + _, err := dd.getHelmRelease(expandedResource.Resource, config, flinkOut.KubeCluster) + return err +} + +func mergeConsumerGroupId(currStreams, newStreams []Source) []Source { + if len(currStreams) != len(newStreams) { + return newStreams + } + + for i := range currStreams { + if currStreams[i].SourceParquet.SourceParquetFilePaths != nil && len(currStreams[i].SourceParquet.SourceParquetFilePaths) > 0 { + //source is parquete + //do nothing + continue + } + + if currStreams[i].SourceKafka.SourceKafkaName == newStreams[i].SourceKafka.SourceKafkaName { + newStreams[i].SourceKafka.SourceKafkaConsumerConfigGroupID = currStreams[i].SourceKafka.SourceKafkaConsumerConfigGroupID + } + } + + return newStreams +} diff --git a/modules/dagger/driver_sync.go b/modules/dagger/driver_sync.go new file mode 100644 index 00000000..34885d15 --- /dev/null +++ b/modules/dagger/driver_sync.go @@ -0,0 +1,88 @@ +package dagger + +import ( + "context" + "encoding/json" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/core/resource" + "github.com/goto/entropy/modules" + "github.com/goto/entropy/modules/flink" + "github.com/goto/entropy/modules/kubernetes" + "github.com/goto/entropy/pkg/errors" +) + +func (dd *daggerDriver) Sync(ctx context.Context, exr module.ExpandedResource) (*resource.State, error) { + modData, err := readTransientData(exr) + if err != nil { + return nil, err + } + + out, err := readOutputData(exr) + if err != nil { + return nil, errors.ErrInternal.WithCausef(err.Error()) + } + + conf, err := readConfig(exr, exr.Spec.Configs, dd.conf) + if err != nil { + return nil, errors.ErrInternal.WithCausef(err.Error()) + } + + var flinkOut flink.Output + if err := json.Unmarshal(exr.Dependencies[keyFlinkDependency].Output, &flinkOut); err != nil { + return nil, errors.ErrInternal.WithMsgf("invalid flink state").WithCausef(err.Error()) + } + + finalState := resource.State{ + Status: resource.StatusPending, + Output: exr.Resource.State.Output, + } + + if len(modData.PendingSteps) > 0 { + pendingStep := modData.PendingSteps[0] + modData.PendingSteps = modData.PendingSteps[1:] + + switch pendingStep { + case stepReleaseCreate, stepReleaseUpdate: + isCreate := pendingStep == stepReleaseCreate + if err := dd.releaseSync(ctx, exr.Resource, isCreate, *conf, flinkOut.KubeCluster); err != nil { + return nil, err + } + default: + return nil, errors.ErrInternal.WithMsgf("unknown step: '%s'", pendingStep) + } + + // we have more pending states, so enqueue resource for another sync + // as soon as possible. + immediately := dd.timeNow() + finalState.NextSyncAt = &immediately + finalState.ModuleData = modules.MustJSON(modData) + + return &finalState, nil + } + + finalOut, err := dd.refreshOutput(ctx, exr.Resource, *conf, *out, flinkOut.KubeCluster) + if err != nil { + return nil, err + } + finalState.Output = finalOut + + finalState.Status = resource.StatusCompleted + finalState.ModuleData = nil + return &finalState, nil + +} + +func (dd *daggerDriver) releaseSync(ctx context.Context, r resource.Resource, + isCreate bool, conf Config, kubeOut kubernetes.Output, +) error { + rc, err := dd.getHelmRelease(r, conf, kubeOut) + if err != nil { + return err + } + + if err := dd.kubeDeploy(ctx, isCreate, kubeOut.Configs, *rc); err != nil { + return errors.ErrInternal.WithCausef(err.Error()) + } + return nil +} diff --git a/modules/dagger/module.go b/modules/dagger/module.go new file mode 100644 index 00000000..bfc41491 --- /dev/null +++ b/modules/dagger/module.go @@ -0,0 +1,127 @@ +package dagger + +import ( + "context" + _ "embed" + "encoding/json" + "time" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/modules/flink" + "github.com/goto/entropy/pkg/errors" + "github.com/goto/entropy/pkg/helm" + "github.com/goto/entropy/pkg/kube" + "github.com/goto/entropy/pkg/validator" + "helm.sh/helm/v3/pkg/release" + v1 "k8s.io/api/core/v1" +) + +const ( + keyFlinkDependency = "flink" +) + +type FlinkCRDStatus struct { + JobManagerDeploymentStatus string `json:"jobManagerDeploymentStatus"` + JobStatus string `json:"jobStatus"` + ReconciliationStatus string `json:"reconciliationStatus"` +} + +var Module = module.Descriptor{ + Kind: "dagger", + Dependencies: map[string]string{ + keyFlinkDependency: flink.Module.Kind, + }, + Actions: []module.ActionDesc{ + { + Name: module.CreateAction, + Description: "Creates a new dagger", + }, + { + Name: module.UpdateAction, + Description: "Updates an existing dagger", + }, + }, + DriverFactory: func(confJSON json.RawMessage) (module.Driver, error) { + conf := defaultDriverConf // clone the default value + if err := json.Unmarshal(confJSON, &conf); err != nil { + return nil, err + } else if err := validator.TaggedStruct(conf); err != nil { + return nil, err + } + + return &daggerDriver{ + conf: conf, + timeNow: time.Now, + kubeDeploy: func(_ context.Context, isCreate bool, kubeConf kube.Config, hc helm.ReleaseConfig) error { + canUpdate := func(rel *release.Release) bool { + curLabels, ok := rel.Config[labelsConfKey].(map[string]any) + if !ok { + return false + } + newLabels, ok := hc.Values[labelsConfKey].(map[string]string) + if !ok { + return false + } + + isManagedByEntropy := curLabels[labelOrchestrator] == orchestratorLabelValue + isSameDeployment := curLabels[labelDeployment] == newLabels[labelDeployment] + + return isManagedByEntropy && isSameDeployment + } + + helmCl := helm.NewClient(&helm.Config{Kubernetes: kubeConf}) + _, errHelm := helmCl.Upsert(&hc, canUpdate) + return errHelm + }, + kubeGetPod: func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) { + kubeCl, err := kube.NewClient(ctx, conf) + if err != nil { + return nil, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get pod").WithCausef(err.Error()) + } + return kubeCl.GetPodDetails(ctx, ns, labels, func(pod v1.Pod) bool { + // allow pods that are in running state and are not marked for deletion + return pod.Status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil + }) + }, + kubeGetCRD: func(ctx context.Context, conf kube.Config, ns string, name string) (kube.FlinkDeploymentStatus, error) { + kubeCl, err := kube.NewClient(ctx, conf) + if err != nil { + return kube.FlinkDeploymentStatus{}, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get pod").WithCausef(err.Error()) + } + crd, err := kubeCl.GetCRDDetails(ctx, ns, name) + if err != nil { + return kube.FlinkDeploymentStatus{}, err + } + return parseFlinkCRDStatus(crd.Object) + }}, nil + }, +} + +func parseFlinkCRDStatus(flinkDeployment map[string]interface{}) (kube.FlinkDeploymentStatus, error) { + var flinkCRDStatus FlinkCRDStatus + statusInterface, ok := flinkDeployment["status"].(map[string]interface{}) + if !ok { + return kube.FlinkDeploymentStatus{}, errors.ErrInternal.WithMsgf("failed to convert flink deployment status to map[string]interface{}") + } + + if jmStatus, ok := statusInterface["jobManagerDeploymentStatus"].(string); ok { + flinkCRDStatus.JobManagerDeploymentStatus = jmStatus + } + if jobStatus, ok := statusInterface["jobStatus"].(map[string]interface{}); ok { + if st, ok := jobStatus["state"].(string); ok { + flinkCRDStatus.JobStatus = st + } + } + if reconciliationStatus, ok := statusInterface["reconciliationStatus"].(map[string]interface{}); ok { + if st, ok := reconciliationStatus["state"].(string); ok { + flinkCRDStatus.ReconciliationStatus = st + } + } + + status := kube.FlinkDeploymentStatus{ + JMDeployStatus: flinkCRDStatus.JobManagerDeploymentStatus, + JobStatus: flinkCRDStatus.JobStatus, + Reconciliation: flinkCRDStatus.ReconciliationStatus, + } + return status, nil +} diff --git a/modules/dagger/schema/config.json b/modules/dagger/schema/config.json new file mode 100644 index 00000000..3effabd0 --- /dev/null +++ b/modules/dagger/schema/config.json @@ -0,0 +1,49 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": [ + "replicas", + "env_variables", + "team", + "source", + "sink", + "sink_type" + ], + "properties": { + "replicas": { + "type": "number", + "default": 1, + "minimum": 1 + }, + "deployment_id": { + "type": "string" + }, + "sink_type": { + "type": "string", + "enum": [ + "INFLUX", + "KAFKA", + "BIGQUERY" + ] + }, + "env_variables": { + "type": "object", + "additionalProperties": true, + "required": [ + "SINK_TYPE" + ], + "properties": { + "SINK_TYPE": { + "type": "string", + "enum": [ + "INFLUX", + "KAFKA", + "BIGQUERY" + ] + } + } + } + } + } + \ No newline at end of file diff --git a/modules/flink/config.go b/modules/flink/config.go index 24a06a23..6ea778ab 100644 --- a/modules/flink/config.go +++ b/modules/flink/config.go @@ -26,6 +26,8 @@ type Config struct { KubeNamespace string `json:"kube_namespace,omitempty"` Influx Influx `json:"influx,omitempty"` SinkKafkaStream string `json:"sink_kafka_stream,omitempty"` + PrometheusURL string `json:"prometheus_url,omitempty"` + FlinkName string `json:"flink_name,omitempty"` } func readConfig(_ resource.Resource, confJSON json.RawMessage, dc driverConf) (*Config, error) { @@ -48,5 +50,13 @@ func readConfig(_ resource.Resource, confJSON json.RawMessage, dc driverConf) (* cfg.KubeNamespace = dc.KubeNamespace } + if cfg.PrometheusURL == "" { + cfg.PrometheusURL = dc.PrometheusURL + } + + if cfg.FlinkName == "" { + cfg.FlinkName = dc.FlinkName + } + return &cfg, nil } diff --git a/modules/flink/driver.go b/modules/flink/driver.go index 64a53657..9fbb0b29 100644 --- a/modules/flink/driver.go +++ b/modules/flink/driver.go @@ -16,6 +16,8 @@ type driverConf struct { Influx Influx `json:"influx,omitempty"` SinkKafkaStream string `json:"sink_kafka_stream,omitempty"` KubeNamespace string `json:"kube_namespace,omitempty"` + PrometheusURL string `json:"prometheus_url,omitempty"` + FlinkName string `json:"flink_name,omitempty"` } type Output struct { @@ -23,6 +25,8 @@ type Output struct { KubeNamespace string `json:"kube_namespace,omitempty"` Influx Influx `json:"influx,omitempty"` SinkKafkaStream string `json:"sink_kafka_stream,omitempty"` + PrometheusURL string `json:"prometheus_url,omitempty"` + FlinkName string `json:"flink_name,omitempty"` } func readOutputData(exr module.ExpandedResource) (*Output, error) { diff --git a/modules/flink/driver_output.go b/modules/flink/driver_output.go index 21823081..58596a91 100644 --- a/modules/flink/driver_output.go +++ b/modules/flink/driver_output.go @@ -33,6 +33,8 @@ func (fd *flinkDriver) Output(ctx context.Context, exr module.ExpandedResource) output.Influx = conf.Influx output.KubeNamespace = conf.KubeNamespace output.SinkKafkaStream = conf.SinkKafkaStream + output.PrometheusURL = conf.PrometheusURL + output.FlinkName = conf.FlinkName return modules.MustJSON(output), nil } diff --git a/modules/flink/schema/config.json b/modules/flink/schema/config.json index e9e5bd41..a78df504 100644 --- a/modules/flink/schema/config.json +++ b/modules/flink/schema/config.json @@ -20,7 +20,7 @@ } } }, - "sink_kafka_stream_name": { + "sink_kafka_stream": { "type": "string" } } diff --git a/pkg/helm/helm.go b/pkg/helm/helm.go index c032a1fa..c06b59fe 100644 --- a/pkg/helm/helm.go +++ b/pkg/helm/helm.go @@ -70,20 +70,22 @@ func (p *Client) doCreate(actionConfig *action.Configuration, config *ReleaseCon } act := action.NewInstall(actionConfig) - act.Wait = config.Wait - act.DryRun = false + act.Wait = true + act.IncludeCRDs = true + act.SkipCRDs = false act.Timeout = time.Duration(config.Timeout) * time.Second act.Replace = config.Replace act.OutputDir = "" act.Namespace = config.Namespace act.ClientOnly = false act.Description = config.Description - act.WaitForJobs = config.WaitForJobs + act.WaitForJobs = true act.ReleaseName = config.Name act.GenerateName = false act.NameTemplate = "" act.CreateNamespace = config.CreateNamespace act.ChartPathOptions = *chartPathOpts + act.DryRun = false rel, err := act.Run(fetchedChart, config.Values) if err != nil { diff --git a/pkg/kube/client.go b/pkg/kube/client.go index af9f6fa4..b4c15b2e 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -15,9 +15,12 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/selection" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" typedbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1" "k8s.io/client-go/rest" @@ -55,6 +58,13 @@ type Pod struct { Status string `json:"status"` } +type FlinkDeploymentStatus struct { + State string `json:"state"` + JMDeployStatus string `json:"jm_deploy_status"` + JobStatus string `json:"job_status"` + Reconciliation string `json:"reconciliation"` +} + type LogOptions struct { App string `mapstructure:"app"` Pod string `mapstructure:"pod"` @@ -323,6 +333,29 @@ func (c Client) GetPodDetails(ctx context.Context, namespace string, labelSelect return podDetails, nil } +func (c Client) GetCRDDetails(ctx context.Context, namespace string, name string) (*unstructured.Unstructured, error) { + // Initialize the dynamic client + dynamicClient, err := dynamic.NewForConfig(&c.restConfig) + if err != nil { + return nil, fmt.Errorf("failed to create dynamic client: %v", err) + } + + // Define the GVR (GroupVersionResource) for the FlinkDeployment CRD + gvr := schema.GroupVersionResource{ + Group: "flink.apache.org", + Version: "v1beta1", + Resource: "flinkdeployments", + } + + // Fetch the FlinkDeployment CRD details + flinkDeployment, err := dynamicClient.Resource(gvr).Namespace(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get FlinkDeployment: %v", err) + } + + return flinkDeployment, nil +} + func streamContainerLogs(ctx context.Context, ns, podName string, logCh chan<- LogChunk, clientSet *kubernetes.Clientset, podLogOpts corev1.PodLogOptions, ) error { diff --git a/test/testbench/test_data/resource/flink_resource.json b/test/testbench/test_data/resource/flink_resource.json index ada2bf23..d68f0a56 100644 --- a/test/testbench/test_data/resource/flink_resource.json +++ b/test/testbench/test_data/resource/flink_resource.json @@ -13,7 +13,7 @@ "username": "influx-user" }, "kube_namespace": "flink-ns", - "sink_kafka_stream_name": "flinkstream" + "sink_kafka_stream": "flinkstream" }, "dependencies": [ {