From 250a3df16c0f461f3a194d4b18484077ff4ba0aa Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Fri, 25 Oct 2024 15:28:38 -0700 Subject: [PATCH 1/6] deprecates jobmapping transformer source --- backend/gen/go/protos/mgmt/v1alpha1/job.pb.go | 1 + backend/protos/mgmt/v1alpha1/job.proto | 1 + .../mgmt/v1alpha1/job-service/jobs_test.go | 93 ++++-------- .../userdefined_transformers.go | 4 +- backend/sql/postgresql/models/models.go | 12 +- backend/sql/postgresql/models/transformers.go | 32 ++-- docs/openapi/mgmt/v1alpha1/job.openapi.yaml | 1 + docs/openapi/neosync.mgmt.v1alpha1.yaml | 3 + docs/protos/mgmt/v1alpha1/job.proto.mdx | 2 +- docs/protos/proto_docs.json | 2 +- .../web/app/(mgmt)/[account]/jobs/util.ts | 33 ++--- .../components/jobs/SchemaTable/RowAlert.tsx | 17 +-- .../jobs/SchemaTable/SchemaColumns.tsx | 11 +- .../jobs/SchemaTable/SchemaTableToolBar.tsx | 33 +++-- .../jobs/SchemaTable/TransformerSelect.tsx | 7 +- .../SchemaTable/schema-constraint-handler.ts | 2 +- .../jobs/SchemaTable/transformer-handler.ts | 23 ++- frontend/apps/web/util/util.ts | 11 +- frontend/apps/web/yup-validations/jobs.ts | 3 - .../sdk/src/client/mgmt/v1alpha1/job_pb.ts | 2 + .../benthos/default_transform/processor.go | 15 +- .../benthos-builder_test.go | 41 +++--- .../gen-benthos-configs/processors.go | 138 ++++++++++-------- .../gen-benthos-configs/processors_test.go | 14 +- .../activities/gen-benthos-configs/sync.go | 24 ++- .../gen-benthos-configs/sync_test.go | 10 +- .../activities/gen-benthos-configs/utils.go | 32 +++- .../init-statement-builder_test.go | 15 -- .../workflow/workflow_integration_test.go | 26 +--- 29 files changed, 296 insertions(+), 312 deletions(-) diff --git a/backend/gen/go/protos/mgmt/v1alpha1/job.pb.go b/backend/gen/go/protos/mgmt/v1alpha1/job.pb.go index dec8c50657..6a5f946d1e 100644 --- a/backend/gen/go/protos/mgmt/v1alpha1/job.pb.go +++ b/backend/gen/go/protos/mgmt/v1alpha1/job.pb.go @@ -3261,6 +3261,7 @@ type JobMappingTransformer struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + // @deprecated - This is no longer used in favor just providing the TransformerConfig Source TransformerSource `protobuf:"varint,1,opt,name=source,proto3,enum=mgmt.v1alpha1.TransformerSource" json:"source,omitempty"` Config *TransformerConfig `protobuf:"bytes,3,opt,name=config,proto3" json:"config,omitempty"` } diff --git a/backend/protos/mgmt/v1alpha1/job.proto b/backend/protos/mgmt/v1alpha1/job.proto index 1ff66d0a29..6b5ab58fd6 100644 --- a/backend/protos/mgmt/v1alpha1/job.proto +++ b/backend/protos/mgmt/v1alpha1/job.proto @@ -352,6 +352,7 @@ message CreateJobResponse { } message JobMappingTransformer { + // @deprecated - This is no longer used in favor just providing the TransformerConfig TransformerSource source = 1; TransformerConfig config = 3; } diff --git a/backend/services/mgmt/v1alpha1/job-service/jobs_test.go b/backend/services/mgmt/v1alpha1/job-service/jobs_test.go index 99f9772a2f..c2ee2b76e0 100644 --- a/backend/services/mgmt/v1alpha1/job-service/jobs_test.go +++ b/backend/services/mgmt/v1alpha1/job-service/jobs_test.go @@ -403,12 +403,10 @@ func Test_CreateJob(t *testing.T) { }, Mappings: []*pg_models.JobMapping{ {Schema: "schema-1", Table: "table-1", Column: "col", JobMappingTransformer: &pg_models.JobMappingTransformerModel{ - Source: int32(mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH), - Config: &pg_models.TransformerConfigs{}, + Config: &pg_models.TransformerConfig{Passthrough: &pg_models.PassthroughConfig{}}, }}, {Schema: "schema-2", Table: "table-2", Column: "col", JobMappingTransformer: &pg_models.JobMappingTransformerModel{ - Source: int32(mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH), - Config: &pg_models.TransformerConfigs{}, + Config: &pg_models.TransformerConfig{Passthrough: &pg_models.PassthroughConfig{}}, }}, }, VirtualForeignKeys: []*pg_models.VirtualForeignConstraint{}, @@ -462,12 +460,10 @@ func Test_CreateJob(t *testing.T) { }, Mappings: []*mgmtv1alpha1.JobMapping{ {Schema: "schema-1", Table: "table-1", Column: "col", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, {Schema: "schema-2", Table: "table-2", Column: "col", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, }, WorkflowOptions: &mgmtv1alpha1.WorkflowOptions{}, @@ -548,12 +544,10 @@ func Test_CreateJob_Schedule_Creation_Error(t *testing.T) { }, Mappings: []*pg_models.JobMapping{ {Schema: "schema-1", Table: "table-1", Column: "col", JobMappingTransformer: &pg_models.JobMappingTransformerModel{ - Source: int32(mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH), - Config: &pg_models.TransformerConfigs{}, + Config: &pg_models.TransformerConfig{Passthrough: &pg_models.PassthroughConfig{}}, }}, {Schema: "schema-2", Table: "table-2", Column: "col", JobMappingTransformer: &pg_models.JobMappingTransformerModel{ - Source: int32(mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH), - Config: &pg_models.TransformerConfigs{}, + Config: &pg_models.TransformerConfig{Passthrough: &pg_models.PassthroughConfig{}}, }}, }, VirtualForeignKeys: []*pg_models.VirtualForeignConstraint{}, @@ -609,12 +603,10 @@ func Test_CreateJob_Schedule_Creation_Error(t *testing.T) { }, Mappings: []*mgmtv1alpha1.JobMapping{ {Schema: "schema-1", Table: "table-1", Column: "col", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, {Schema: "schema-2", Table: "table-2", Column: "col", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, }, WorkflowOptions: &mgmtv1alpha1.WorkflowOptions{}, @@ -697,12 +689,10 @@ func Test_CreateJob_Schedule_Creation_Error_JobCleanup_Error(t *testing.T) { }, Mappings: []*pg_models.JobMapping{ {Schema: "schema-1", Table: "table-1", Column: "col", JobMappingTransformer: &pg_models.JobMappingTransformerModel{ - Source: int32(mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH), - Config: &pg_models.TransformerConfigs{}, + Config: &pg_models.TransformerConfig{Passthrough: &pg_models.PassthroughConfig{}}, }}, {Schema: "schema-2", Table: "table-2", Column: "col", JobMappingTransformer: &pg_models.JobMappingTransformerModel{ - Source: int32(mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH), - Config: &pg_models.TransformerConfigs{}, + Config: &pg_models.TransformerConfig{Passthrough: &pg_models.PassthroughConfig{}}, }}, }, VirtualForeignKeys: []*pg_models.VirtualForeignConstraint{}, @@ -758,12 +748,10 @@ func Test_CreateJob_Schedule_Creation_Error_JobCleanup_Error(t *testing.T) { }, Mappings: []*mgmtv1alpha1.JobMapping{ {Schema: "schema-1", Table: "table-1", Column: "col", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, {Schema: "schema-2", Table: "table-2", Column: "col", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, }, WorkflowOptions: &mgmtv1alpha1.WorkflowOptions{}, @@ -1001,8 +989,7 @@ func Test_UpdateJobSourceConnection_Success(t *testing.T) { ID: job.ID, Mappings: []*pg_models.JobMapping{ {Schema: "schema-1", Table: "table-1", Column: "col", JobMappingTransformer: &pg_models.JobMappingTransformerModel{ - Source: int32(mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH), - Config: &pg_models.TransformerConfigs{}, + Config: &pg_models.TransformerConfig{Passthrough: &pg_models.PassthroughConfig{}}, }}, }, UpdatedByID: userUuid, @@ -1047,8 +1034,7 @@ func Test_UpdateJobSourceConnection_Success(t *testing.T) { }, Mappings: []*mgmtv1alpha1.JobMapping{ {Schema: "schema-1", Table: "table-1", Column: "col", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, }, }, @@ -1089,8 +1075,7 @@ func Test_UpdateJobSourceConnection_GenerateSuccess(t *testing.T) { ID: job.ID, Mappings: []*pg_models.JobMapping{ {Schema: "schema-1", Table: "table-1", Column: "col", JobMappingTransformer: &pg_models.JobMappingTransformerModel{ - Source: int32(mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH), - Config: &pg_models.TransformerConfigs{}, + Config: &pg_models.TransformerConfig{Passthrough: &pg_models.PassthroughConfig{}}, }}, }, UpdatedByID: userUuid, @@ -1133,8 +1118,7 @@ func Test_UpdateJobSourceConnection_GenerateSuccess(t *testing.T) { }, Mappings: []*mgmtv1alpha1.JobMapping{ {Schema: "schema-1", Table: "table-1", Column: "col", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, }, }, @@ -1190,8 +1174,7 @@ func Test_UpdateJobSourceConnection_PgMismatchError(t *testing.T) { }, Mappings: []*mgmtv1alpha1.JobMapping{ {Schema: "schema-1", Table: "table-1", Column: "col", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, }, }, @@ -1248,8 +1231,7 @@ func Test_UpdateJobSourceConnection_MysqlMismatchError(t *testing.T) { }, Mappings: []*mgmtv1alpha1.JobMapping{ {Schema: "schema-1", Table: "table-1", Column: "col", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, }, }, @@ -1306,8 +1288,7 @@ func Test_UpdateJobSourceConnection_AwsS3MismatchError(t *testing.T) { }, Mappings: []*mgmtv1alpha1.JobMapping{ {Schema: "schema-1", Table: "table-1", Column: "col", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, }, }, @@ -1829,28 +1810,22 @@ func Test_ValidateJobMappings_NoValidationErrors(t *testing.T) { ConnectionId: connId, Mappings: []*mgmtv1alpha1.JobMapping{ {Schema: "public", Table: "orders", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, {Schema: "public", Table: "users", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, {Schema: "circle", Table: "table_1", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, {Schema: "circle", Table: "table_1", Column: "table2_id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, {Schema: "circle", Table: "table_2", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, {Schema: "circle", Table: "table_2", Column: "table1_id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, }, }, @@ -1904,28 +1879,22 @@ func Test_ValidateJobMappings_ValidationErrors(t *testing.T) { ConnectionId: connId, Mappings: []*mgmtv1alpha1.JobMapping{ {Schema: "public", Table: "orders", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, {Schema: "account", Table: "accounts", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, {Schema: "circle", Table: "table_1", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, {Schema: "circle", Table: "table_1", Column: "table2_id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, {Schema: "circle", Table: "table_2", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, {Schema: "circle", Table: "table_2", Column: "table1_id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}, }}, }, }, diff --git a/backend/services/mgmt/v1alpha1/transformers-service/userdefined_transformers.go b/backend/services/mgmt/v1alpha1/transformers-service/userdefined_transformers.go index 7db6180acf..c9d29486f4 100644 --- a/backend/services/mgmt/v1alpha1/transformers-service/userdefined_transformers.go +++ b/backend/services/mgmt/v1alpha1/transformers-service/userdefined_transformers.go @@ -91,7 +91,7 @@ func (s *Service) CreateUserDefinedTransformer(ctx context.Context, req *connect AccountID: *accountUuid, Name: req.Msg.Name, Description: req.Msg.Description, - TransformerConfig: &pg_models.TransformerConfigs{}, + TransformerConfig: &pg_models.TransformerConfig{}, Source: int32(req.Msg.Source), CreatedByID: *userUuid, UpdatedByID: *userUuid, @@ -173,7 +173,7 @@ func (s *Service) UpdateUserDefinedTransformer(ctx context.Context, req *connect updateParams := &db_queries.UpdateUserDefinedTransformerParams{ Name: req.Msg.Name, Description: req.Msg.Description, - TransformerConfig: &pg_models.TransformerConfigs{}, + TransformerConfig: &pg_models.TransformerConfig{}, UpdatedByID: *userUuid, ID: tUuid, } diff --git a/backend/sql/postgresql/models/models.go b/backend/sql/postgresql/models/models.go index 5e38430811..1e820ca3fb 100644 --- a/backend/sql/postgresql/models/models.go +++ b/backend/sql/postgresql/models/models.go @@ -942,26 +942,22 @@ func (s *DynamoDBSourceOptions) ToDto() *mgmtv1alpha1.DynamoDBSourceConnectionOp if s.UnmappedTransforms == nil { s.UnmappedTransforms = &DynamoDBSourceUnmappedTransformConfig{ B: &JobMappingTransformerModel{ - Source: int32(mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH), - Config: &TransformerConfigs{ + Config: &TransformerConfig{ Passthrough: &PassthroughConfig{}, }, }, Boolean: &JobMappingTransformerModel{ - Source: int32(mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_BOOL), - Config: &TransformerConfigs{ + Config: &TransformerConfig{ GenerateBool: &GenerateBoolConfig{}, }, }, N: &JobMappingTransformerModel{ - Source: int32(mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH), - Config: &TransformerConfigs{ + Config: &TransformerConfig{ Passthrough: &PassthroughConfig{}, }, }, S: &JobMappingTransformerModel{ - Source: int32(mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_RANDOM_STRING), - Config: &TransformerConfigs{ + Config: &TransformerConfig{ GenerateString: &GenerateStringConfig{}, }, }, diff --git a/backend/sql/postgresql/models/transformers.go b/backend/sql/postgresql/models/transformers.go index f9610ec19b..07d4428317 100644 --- a/backend/sql/postgresql/models/transformers.go +++ b/backend/sql/postgresql/models/transformers.go @@ -5,11 +5,10 @@ import ( ) type JobMappingTransformerModel struct { - Source int32 `json:"source"` - Config *TransformerConfigs `json:"config,omitempty"` + Config *TransformerConfig `json:"config,omitempty"` } -type TransformerConfigs struct { +type TransformerConfig struct { GenerateEmail *GenerateEmailConfig `json:"generateEmailConfig,omitempty"` TransformEmail *TransformEmailConfig `json:"transformEmail,omitempty"` GenerateBool *GenerateBoolConfig `json:"generateBool,omitempty"` @@ -205,22 +204,20 @@ type GenerateCountryConfig struct { GenerateFullName *bool `json:"generateFullName,omitempty"` } -// from API -> DB func (t *JobMappingTransformerModel) FromTransformerDto(tr *mgmtv1alpha1.JobMappingTransformer) error { - t.Source = int32(tr.Source) - - config := &TransformerConfigs{} + if tr == nil { + tr = &mgmtv1alpha1.JobMappingTransformer{} + } + config := &TransformerConfig{} if err := config.FromTransformerConfigDto(tr.GetConfig()); err != nil { return err } - t.Config = config - return nil } -func (t *TransformerConfigs) FromTransformerConfigDto(tr *mgmtv1alpha1.TransformerConfig) error { +func (t *TransformerConfig) FromTransformerConfigDto(tr *mgmtv1alpha1.TransformerConfig) error { if tr == nil { tr = &mgmtv1alpha1.TransformerConfig{} } @@ -378,31 +375,22 @@ func (t *TransformerConfigs) FromTransformerConfigDto(tr *mgmtv1alpha1.Transform GenerateFullName: tr.GetGenerateCountryConfig().GenerateFullName, } default: - t = &TransformerConfigs{} + t = &TransformerConfig{} } return nil } -// DB -> API func (t *JobMappingTransformerModel) ToTransformerDto() *mgmtv1alpha1.JobMappingTransformer { - _, ok := mgmtv1alpha1.TransformerSource_name[t.Source] - var source mgmtv1alpha1.TransformerSource - if !ok { - source = mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_UNSPECIFIED - } else { - source = mgmtv1alpha1.TransformerSource(t.Source) - } if t.Config == nil { - t.Config = &TransformerConfigs{} + t.Config = &TransformerConfig{} } return &mgmtv1alpha1.JobMappingTransformer{ - Source: source, Config: t.Config.ToTransformerConfigDto(), } } -func (t *TransformerConfigs) ToTransformerConfigDto() *mgmtv1alpha1.TransformerConfig { +func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerConfig { switch { case t.GenerateEmail != nil: return &mgmtv1alpha1.TransformerConfig{ diff --git a/docs/openapi/mgmt/v1alpha1/job.openapi.yaml b/docs/openapi/mgmt/v1alpha1/job.openapi.yaml index 065e94d7de..fadc17e0c4 100644 --- a/docs/openapi/mgmt/v1alpha1/job.openapi.yaml +++ b/docs/openapi/mgmt/v1alpha1/job.openapi.yaml @@ -2807,6 +2807,7 @@ components: source: allOf: - title: source + description: '@deprecated - This is no longer used in favor just providing the TransformerConfig' - $ref: '#/components/schemas/mgmt.v1alpha1.TransformerSource' config: allOf: diff --git a/docs/openapi/neosync.mgmt.v1alpha1.yaml b/docs/openapi/neosync.mgmt.v1alpha1.yaml index 0575fe75d5..34f9645121 100644 --- a/docs/openapi/neosync.mgmt.v1alpha1.yaml +++ b/docs/openapi/neosync.mgmt.v1alpha1.yaml @@ -9540,6 +9540,9 @@ components: source: allOf: - title: source + description: >- + @deprecated - This is no longer used in favor just providing the + TransformerConfig - $ref: '#/components/schemas/mgmt.v1alpha1.TransformerSource' config: allOf: diff --git a/docs/protos/mgmt/v1alpha1/job.proto.mdx b/docs/protos/mgmt/v1alpha1/job.proto.mdx index 737900505b..a7ef659470 100644 --- a/docs/protos/mgmt/v1alpha1/job.proto.mdx +++ b/docs/protos/mgmt/v1alpha1/job.proto.mdx @@ -270,7 +270,7 @@ _**package** mgmt.v1alpha1_ ### `JobMappingTransformer` - + ### `JobNextRuns` diff --git a/docs/protos/proto_docs.json b/docs/protos/proto_docs.json index 54110142b1..c9e4e52141 100644 --- a/docs/protos/proto_docs.json +++ b/docs/protos/proto_docs.json @@ -11195,7 +11195,7 @@ "fields": [ { "name": "source", - "description": "", + "description": "@deprecated - This is no longer used in favor just providing the TransformerConfig", "label": "", "type": "TransformerSource", "longType": "TransformerSource", diff --git a/frontend/apps/web/app/(mgmt)/[account]/jobs/util.ts b/frontend/apps/web/app/(mgmt)/[account]/jobs/util.ts index 4c2aeb54c9..828f4193dd 100644 --- a/frontend/apps/web/app/(mgmt)/[account]/jobs/util.ts +++ b/frontend/apps/web/app/(mgmt)/[account]/jobs/util.ts @@ -71,7 +71,6 @@ import { PostgresTruncateTableConfig, RetryPolicy, TransformerConfig, - TransformerSource, ValidateJobMappingsRequest, ValidateJobMappingsResponse, VirtualForeignConstraint, @@ -702,7 +701,6 @@ export function getDefaultUnmappedTransformConfig(): DynamoDBSourceUnmappedTrans return { boolean: convertJobMappingTransformerToForm( new JobMappingTransformer({ - source: TransformerSource.GENERATE_BOOL, config: new TransformerConfig({ config: { case: 'generateBoolConfig', @@ -713,7 +711,6 @@ export function getDefaultUnmappedTransformConfig(): DynamoDBSourceUnmappedTrans ), byte: convertJobMappingTransformerToForm( new JobMappingTransformer({ - source: TransformerSource.PASSTHROUGH, config: new TransformerConfig({ config: { case: 'passthroughConfig', @@ -724,7 +721,6 @@ export function getDefaultUnmappedTransformConfig(): DynamoDBSourceUnmappedTrans ), n: convertJobMappingTransformerToForm( new JobMappingTransformer({ - source: TransformerSource.PASSTHROUGH, config: new TransformerConfig({ config: { case: 'passthroughConfig', @@ -735,7 +731,6 @@ export function getDefaultUnmappedTransformConfig(): DynamoDBSourceUnmappedTrans ), s: convertJobMappingTransformerToForm( new JobMappingTransformer({ - source: TransformerSource.GENERATE_RANDOM_STRING, config: new TransformerConfig({ config: { case: 'generateStringConfig', @@ -860,7 +855,6 @@ export function toDynamoDbSourceUnmappedOptionsFormValues( boolean: convertJobMappingTransformerToForm( ut.boolean || new JobMappingTransformer({ - source: TransformerSource.GENERATE_BOOL, config: new TransformerConfig({ config: { case: 'generateBoolConfig', @@ -872,7 +866,6 @@ export function toDynamoDbSourceUnmappedOptionsFormValues( byte: convertJobMappingTransformerToForm( ut.b || new JobMappingTransformer({ - source: TransformerSource.PASSTHROUGH, config: new TransformerConfig({ config: { case: 'passthroughConfig', @@ -884,7 +877,6 @@ export function toDynamoDbSourceUnmappedOptionsFormValues( n: convertJobMappingTransformerToForm( ut.n || new JobMappingTransformer({ - source: TransformerSource.PASSTHROUGH, config: new TransformerConfig({ config: { case: 'passthroughConfig', @@ -896,7 +888,6 @@ export function toDynamoDbSourceUnmappedOptionsFormValues( s: convertJobMappingTransformerToForm( ut.s || new JobMappingTransformer({ - source: TransformerSource.GENERATE_RANDOM_STRING, config: new TransformerConfig({ config: { case: 'generateStringConfig', @@ -1521,20 +1512,18 @@ export async function validateJobMapping( schema: m.schema, table: m.table, column: m.column, - transformer: - m.transformer.source != 0 - ? convertJobMappingTransformerFormToJobMappingTransformer( - m.transformer - ) - : new JobMappingTransformer({ - source: 1, - config: new TransformerConfig({ - config: { - case: 'passthroughConfig', - value: {}, - }, - }), + transformer: m.transformer.config.case + ? convertJobMappingTransformerFormToJobMappingTransformer( + m.transformer + ) + : new JobMappingTransformer({ + config: new TransformerConfig({ + config: { + case: 'passthroughConfig', + value: {}, + }, }), + }), }); }), virtualForeignKeys: virtualForeignKeys.map((v) => { diff --git a/frontend/apps/web/components/jobs/SchemaTable/RowAlert.tsx b/frontend/apps/web/components/jobs/SchemaTable/RowAlert.tsx index 1913a80456..ea145db7a7 100644 --- a/frontend/apps/web/components/jobs/SchemaTable/RowAlert.tsx +++ b/frontend/apps/web/components/jobs/SchemaTable/RowAlert.tsx @@ -5,24 +5,21 @@ import { TooltipTrigger, } from '@/components/ui/tooltip'; import { ExclamationTriangleIcon } from '@radix-ui/react-icons'; -import { Row } from '@tanstack/react-table'; import { ReactElement } from 'react'; -import { SchemaConstraintHandler } from './schema-constraint-handler'; +import { + ColumnKey, + SchemaConstraintHandler, +} from './schema-constraint-handler'; interface Props { - row: Row<{ schema: string; table: string; column: string }>; + rowKey: ColumnKey; handler: SchemaConstraintHandler; onRemoveClick(): void; } export default function SchemaRowAlert(props: Props): ReactElement { - const { row, handler, onRemoveClick } = props; - const key = { - schema: row.getValue('schema'), - table: row.getValue('table'), - column: row.getValue('column'), - }; - const isInSchema = handler.getIsInSchema(key); + const { rowKey, handler, onRemoveClick } = props; + const isInSchema = handler.getIsInSchema(rowKey); const messages: string[] = []; diff --git a/frontend/apps/web/components/jobs/SchemaTable/SchemaColumns.tsx b/frontend/apps/web/components/jobs/SchemaTable/SchemaColumns.tsx index 45ca263f9f..ca383a4078 100644 --- a/frontend/apps/web/components/jobs/SchemaTable/SchemaColumns.tsx +++ b/frontend/apps/web/components/jobs/SchemaTable/SchemaColumns.tsx @@ -127,10 +127,15 @@ export function getSchemaColumns(props: Props): ColumnDef[] { control: form.control, name: 'mappings', }); + const columnKey: ColumnKey = { + schema: row.getValue('schema'), + table: row.getValue('table'), + column: row.getValue('column'), + }; return (
remove(row.index)} /> @@ -418,8 +423,8 @@ export function getSchemaColumns(props: Props): ColumnDef[] { // row.original works here. There must be a caching bug with the transformer prop being an object. // This may be related: https://github.com/TanStack/table/issues/5363 const rowVal = row.original.transformer; - const tsource = transformerHandler.getSystemTransformerBySource( - rowVal.source + const tsource = transformerHandler.getSystemTransformerByConfigCase( + rowVal.config.case ); const sourceName = tsource?.name.toLowerCase() ?? 'select transformer'; return sourceName.includes((value as string)?.toLowerCase()); diff --git a/frontend/apps/web/components/jobs/SchemaTable/SchemaTableToolBar.tsx b/frontend/apps/web/components/jobs/SchemaTable/SchemaTableToolBar.tsx index f638580453..62b8745e80 100644 --- a/frontend/apps/web/components/jobs/SchemaTable/SchemaTableToolBar.tsx +++ b/frontend/apps/web/components/jobs/SchemaTable/SchemaTableToolBar.tsx @@ -36,7 +36,6 @@ import { Passthrough, SystemTransformer, TransformerConfig, - TransformerSource, UserDefinedTransformer, } from '@neosync/sdk'; import { CheckIcon, Cross2Icon } from '@radix-ui/react-icons'; @@ -47,7 +46,10 @@ import { Row as RowData } from './SchemaPageTable'; import { SchemaTableViewOptions } from './SchemaTableViewOptions'; import TransformerSelect from './TransformerSelect'; import { JobType, SchemaConstraintHandler } from './schema-constraint-handler'; -import { TransformerHandler } from './transformer-handler'; +import { + TransformerConfigCase, + TransformerHandler, +} from './transformer-handler'; interface DataTableToolbarProps { table: Table; @@ -227,7 +229,7 @@ export function SchemaTableToolbar({ formMappings.forEach((fm, idx) => { // skips setting the default transformer if the user has already set the transformer if ( - fm.transformer.source != 0 && + fm.transformer.config.case && !defaultTransformerValues.overrideTransformers ) { return; @@ -244,7 +246,6 @@ export function SchemaTableToolbar({ const newJm = isGenerated && !identityType ? new JobMappingTransformer({ - source: TransformerSource.GENERATE_DEFAULT, config: new TransformerConfig({ config: { case: 'generateDefaultConfig', @@ -253,7 +254,6 @@ export function SchemaTableToolbar({ }), }) : new JobMappingTransformer({ - source: TransformerSource.PASSTHROUGH, config: new TransformerConfig({ config: { case: 'passthroughConfig', @@ -330,7 +330,9 @@ function getFilteredTransformersForBulkSet( const uniqueSystemSources = findCommonSystemSources(systemArrays); const uniqueSystem = uniqueSystemSources - .map((source) => transformerHandler.getSystemTransformerBySource(source)) + .map((source) => + transformerHandler.getSystemTransformerByConfigCase(source) + ) .filter((x): x is SystemTransformer => !!x); const uniqueIds = findCommonUserDefinedIds(userDefinedArrays); @@ -346,28 +348,31 @@ function getFilteredTransformersForBulkSet( function findCommonSystemSources( arrays: SystemTransformer[][] -): TransformerSource[] { - const elementCount: Record = {} as Record< - TransformerSource, +): TransformerConfigCase[] { + const elementCount: Record = {} as Record< + TransformerConfigCase, number >; const subArrayCount = arrays.length; - const commonElements: TransformerSource[] = []; + const commonElements: TransformerConfigCase[] = []; arrays.forEach((subArray) => { // Use a Set to ensure each element in a sub-array is counted only once new Set(subArray).forEach((element) => { - if (!elementCount[element.source]) { - elementCount[element.source] = 1; + if (!element.config?.config.case) { + return; + } + if (!elementCount[element.config.config.case]) { + elementCount[element.config.config.case] = 1; } else { - elementCount[element.source]++; + elementCount[element.config.config.case]++; } }); }); for (const [element, count] of Object.entries(elementCount)) { if (count === subArrayCount) { - commonElements.push(+element as TransformerSource); + commonElements.push(element as TransformerConfigCase); } } diff --git a/frontend/apps/web/components/jobs/SchemaTable/TransformerSelect.tsx b/frontend/apps/web/components/jobs/SchemaTable/TransformerSelect.tsx index 6b14117da6..34a80e1d0f 100644 --- a/frontend/apps/web/components/jobs/SchemaTable/TransformerSelect.tsx +++ b/frontend/apps/web/components/jobs/SchemaTable/TransformerSelect.tsx @@ -22,7 +22,6 @@ import { JobMappingTransformer, SystemTransformer, TransformerConfig, - TransformerSource, UserDefinedTransformer, UserDefinedTransformerConfig, } from '@neosync/sdk'; @@ -101,7 +100,6 @@ export default function TransformerSelect(props: Props): ReactElement { onSelect( convertJobMappingTransformerToForm( new JobMappingTransformer({ - source: TransformerSource.USER_DEFINED, config: new TransformerConfig({ config: { case: 'userDefinedTransformerConfig', @@ -124,8 +122,6 @@ export default function TransformerSelect(props: Props): ReactElement { 'mr-2 h-4 w-4', value?.config?.case === 'userDefinedTransformerConfig' && - value?.source === - TransformerSource.USER_DEFINED && value.config.value.id === t.id ? 'opacity-100' : 'opacity-0' @@ -151,7 +147,6 @@ export default function TransformerSelect(props: Props): ReactElement { onSelect( convertJobMappingTransformerToForm( new JobMappingTransformer({ - source: t.source, config: t.config, }) ) @@ -165,7 +160,7 @@ export default function TransformerSelect(props: Props): ReactElement { = T extends { case: infer U } ? U : never; + +// Computed type that extracts all case types from the config union +export type TransformerConfigCase = NonNullable< + ExtractCase +>; + export class TransformerHandler { private readonly systemTransformers: SystemTransformer[]; private readonly userDefinedTransformers: UserDefinedTransformer[]; + private readonly systemByType: Map; private readonly systemBySource: Map; private readonly userDefinedById: Map; @@ -21,6 +31,13 @@ export class TransformerHandler { this.systemTransformers = systemTransformers; this.userDefinedTransformers = userDefinedTransformers; + this.systemByType = new Map( + // todo + systemTransformers.map((t) => [ + t.config?.config.case ?? 'generateEmailConfig', + t, + ]) + ); this.systemBySource = new Map(systemTransformers.map((t) => [t.source, t])); this.userDefinedById = new Map( userDefinedTransformers.map((t) => [t.id, t]) @@ -66,10 +83,10 @@ export class TransformerHandler { }; } - public getSystemTransformerBySource( - source: TransformerSource + public getSystemTransformerByConfigCase( + caseType: TransformerConfigCase | string ): SystemTransformer | undefined { - return this.systemBySource.get(source); + return this.systemByType.get(caseType as TransformerConfigCase); } public getUserDefinedTransformerById( diff --git a/frontend/apps/web/util/util.ts b/frontend/apps/web/util/util.ts index f665d25cd3..80220d2ca7 100644 --- a/frontend/apps/web/util/util.ts +++ b/frontend/apps/web/util/util.ts @@ -150,24 +150,21 @@ export function getTransformerFromField( handler: TransformerHandler, value: JobMappingTransformerForm ): Transformer { - if ( - value.source === TransformerSource.USER_DEFINED && - value.config.case === 'userDefinedTransformerConfig' - ) { + if (value.config.case === 'userDefinedTransformerConfig') { return ( handler.getUserDefinedTransformerById(value.config.value.id) ?? new SystemTransformer() ); } return ( - handler.getSystemTransformerBySource(value.source) ?? + handler.getSystemTransformerByConfigCase(value.config.case) ?? new SystemTransformer() ); } -// Checks to see if the source is unspecified +// Checks to see if the config is unspecified export function isInvalidTransformer(transformer: Transformer): boolean { - return transformer.source === TransformerSource.UNSPECIFIED; + return transformer.config == null; } export function getTransformerSelectButtonText( diff --git a/frontend/apps/web/yup-validations/jobs.ts b/frontend/apps/web/yup-validations/jobs.ts index aff47f39d2..9342d0215b 100644 --- a/frontend/apps/web/yup-validations/jobs.ts +++ b/frontend/apps/web/yup-validations/jobs.ts @@ -5,7 +5,6 @@ import { getDurationValidateFn } from './number'; // Yup schema form JobMappingTransformers export const JobMappingTransformerForm = Yup.object({ - source: Yup.number().required('A valid transformer source must be specified'), config: TransformerConfigSchema, }); @@ -18,7 +17,6 @@ export function convertJobMappingTransformerToForm( jmt: JobMappingTransformer ): JobMappingTransformerForm { return { - source: jmt.source, config: convertTransformerConfigToForm(jmt.config), }; } @@ -26,7 +24,6 @@ export function convertJobMappingTransformerFormToJobMappingTransformer( form: JobMappingTransformerForm ): JobMappingTransformer { return new JobMappingTransformer({ - source: form.source, config: convertTransformerConfigSchemaToTransformerConfig(form.config), }); } diff --git a/frontend/packages/sdk/src/client/mgmt/v1alpha1/job_pb.ts b/frontend/packages/sdk/src/client/mgmt/v1alpha1/job_pb.ts index c10c4f995c..e9f4c7f423 100644 --- a/frontend/packages/sdk/src/client/mgmt/v1alpha1/job_pb.ts +++ b/frontend/packages/sdk/src/client/mgmt/v1alpha1/job_pb.ts @@ -2615,6 +2615,8 @@ export class CreateJobResponse extends Message { */ export class JobMappingTransformer extends Message { /** + * @deprecated - This is no longer used in favor just providing the TransformerConfig + * * @generated from field: mgmt.v1alpha1.TransformerSource source = 1; */ source = TransformerSource.UNSPECIFIED; diff --git a/worker/pkg/benthos/default_transform/processor.go b/worker/pkg/benthos/default_transform/processor.go index 85e7b108b1..da5ee5d041 100644 --- a/worker/pkg/benthos/default_transform/processor.go +++ b/worker/pkg/benthos/default_transform/processor.go @@ -219,9 +219,18 @@ func initDefaultTransformers(defaultTransformerMap map[primitiveType]*mgmtv1alph } func shouldProcess(t *mgmtv1alpha1.JobMappingTransformer) bool { - return t != nil && - t.Source != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_UNSPECIFIED && - t.Source != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH + if t == nil { + return false + } + + source := + t.GetSource() != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_UNSPECIFIED && + t.GetSource() != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH + + config := t.GetConfig().GetConfig() + passthrough := t.GetConfig().GetPassthroughConfig() + // we only want to process if there is a non-nil config that is not passthrough + return source || (config != nil && passthrough == nil) } func dereferenceValue(value any) any { diff --git a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder_test.go b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder_test.go index 63bc7f1a53..9c0842b54f 100644 --- a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder_test.go +++ b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder_test.go @@ -46,7 +46,9 @@ func Test_ProcessorConfigEmpty(t *testing.T) { Table: "users", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + Config: &mgmtv1alpha1.TransformerConfig{ + Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}, + }, }, }, { @@ -54,7 +56,9 @@ func Test_ProcessorConfigEmpty(t *testing.T) { Table: "users", Column: "name", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_UNSPECIFIED, + Config: &mgmtv1alpha1.TransformerConfig{ + Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}, + }, }, }, }, @@ -130,8 +134,9 @@ func Test_ProcessorConfigEmptyJavascript(t *testing.T) { Table: "users", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{ + Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}, + }, }, }, { @@ -139,7 +144,6 @@ func Test_ProcessorConfigEmptyJavascript(t *testing.T) { Table: "users", Column: "name", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_JAVASCRIPT, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformJavascriptConfig{ TransformJavascriptConfig: &mgmtv1alpha1.TransformJavascript{Code: ""}, @@ -344,19 +348,21 @@ func Test_buildProcessorConfigsMutation(t *testing.T) { require.Empty(t, output) output, err = buildProcessorConfigs(ctx, mockTransformerClient, []*mgmtv1alpha1.JobMapping{ - {Schema: "public", Table: "users", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH}}, + {Schema: "public", Table: "users", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{Config: &mgmtv1alpha1.TransformerConfig{ + Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}, + }}}, }, map[string]*sqlmanager_shared.ColumnInfo{}, map[string][]*referenceKey{}, []string{}, mockJobId, mockRunId, nil, runconfig, nil, []string{}) require.Nil(t, err) require.Empty(t, output) runconfig = tabledependency.NewRunConfig("public.users", tabledependency.RunTypeInsert, []string{}, nil, []string{}, []string{"id", "name"}, []*tabledependency.DependsOn{}, false) output, err = buildProcessorConfigs(ctx, mockTransformerClient, []*mgmtv1alpha1.JobMapping{ - {Schema: "public", Table: "users", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_NULL, Config: &mgmtv1alpha1.TransformerConfig{ + {Schema: "public", Table: "users", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_Nullconfig{ Nullconfig: &mgmtv1alpha1.Null{}, }, }}}, - {Schema: "public", Table: "users", Column: "name", Transformer: &mgmtv1alpha1.JobMappingTransformer{Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_NULL, Config: &mgmtv1alpha1.TransformerConfig{ + {Schema: "public", Table: "users", Column: "name", Transformer: &mgmtv1alpha1.JobMappingTransformer{Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_Nullconfig{ Nullconfig: &mgmtv1alpha1.Null{}, }, @@ -397,7 +403,7 @@ func Test_buildProcessorConfigsMutation(t *testing.T) { runconfig = tabledependency.NewRunConfig("public.users", tabledependency.RunTypeInsert, []string{"id"}, nil, []string{"email"}, []string{"email"}, []*tabledependency.DependsOn{}, false) output, err = buildProcessorConfigs(ctx, mockTransformerClient, []*mgmtv1alpha1.JobMapping{ - {Schema: "public", Table: "users", Column: "email", Transformer: &mgmtv1alpha1.JobMappingTransformer{Source: jsT.Source, Config: jsT.Config}}}, groupedSchemas, map[string][]*referenceKey{}, []string{}, mockJobId, mockRunId, nil, runconfig, nil, []string{}) + {Schema: "public", Table: "users", Column: "email", Transformer: &mgmtv1alpha1.JobMappingTransformer{Config: jsT.Config}}}, groupedSchemas, map[string][]*referenceKey{}, []string{}, mockJobId, mockRunId, nil, runconfig, nil, []string{}) require.Nil(t, err) require.Equal(t, `root."email" = transform_email(value:this."email",preserve_length:false,preserve_domain:true,excluded_domains:[],max_length:40,email_type:"uuidv4",invalid_email_action:"reject")`, *output[0].Mutation) @@ -405,10 +411,9 @@ func Test_buildProcessorConfigsMutation(t *testing.T) { func Test_ShouldProcessColumnTrue(t *testing.T) { val := &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_EMAIL, Config: &mgmtv1alpha1.TransformerConfig{ - Config: &mgmtv1alpha1.TransformerConfig_Nullconfig{ - Nullconfig: &mgmtv1alpha1.Null{}, + Config: &mgmtv1alpha1.TransformerConfig_GenerateEmailConfig{ + GenerateEmailConfig: &mgmtv1alpha1.GenerateEmail{}, }, }, } @@ -419,7 +424,6 @@ func Test_ShouldProcessColumnTrue(t *testing.T) { func Test_ShouldProcessColumnFalse(t *testing.T) { val := &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{ PassthroughConfig: &mgmtv1alpha1.Passthrough{}, @@ -448,7 +452,7 @@ func Test_buildProcessorConfigsJavascriptEmpty(t *testing.T) { runconfig := tabledependency.NewRunConfig("public.users", tabledependency.RunTypeInsert, []string{"id"}, nil, []string{"id"}, []string{"id"}, []*tabledependency.DependsOn{}, false) resp, err := buildProcessorConfigs(ctx, mockTransformerClient, []*mgmtv1alpha1.JobMapping{ - {Schema: "public", Table: "users", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{Source: jsT.Source, Config: jsT.Config}}}, map[string]*sqlmanager_shared.ColumnInfo{}, map[string][]*referenceKey{}, []string{}, mockJobId, mockRunId, nil, runconfig, nil, + {Schema: "public", Table: "users", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{Config: jsT.Config}}}, map[string]*sqlmanager_shared.ColumnInfo{}, map[string][]*referenceKey{}, []string{}, mockJobId, mockRunId, nil, runconfig, nil, []string{}) require.NoError(t, err) @@ -486,7 +490,6 @@ func Test_convertUserDefinedFunctionConfig(t *testing.T) { }), nil) jmt := &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_EMAIL, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_UserDefinedTransformerConfig{ UserDefinedTransformerConfig: &mgmtv1alpha1.UserDefinedTransformerConfig{ @@ -497,7 +500,6 @@ func Test_convertUserDefinedFunctionConfig(t *testing.T) { } expected := &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_EMAIL, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformEmailConfig{ TransformEmailConfig: &mgmtv1alpha1.TransformEmail{ @@ -517,7 +519,7 @@ func Test_convertUserDefinedFunctionConfig(t *testing.T) { func MockJobMappingTransformer(source int32, transformerId string) db_queries.NeosyncApiTransformer { return db_queries.NeosyncApiTransformer{ Source: source, - TransformerConfig: &pg_models.TransformerConfigs{}, + TransformerConfig: &pg_models.TransformerConfig{}, } } @@ -611,7 +613,7 @@ func Test_computeMutationFunction_null(t *testing.T) { val, err := computeMutationFunction( &mgmtv1alpha1.JobMapping{ Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_NULL, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_Nullconfig{}}, }, }, &sqlmanager_shared.ColumnInfo{}, false) require.NoError(t, err) @@ -1010,7 +1012,6 @@ func Test_computeMutationFunction_Validate_Bloblang_Output(t *testing.T) { &mgmtv1alpha1.JobMapping{ Column: "email", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: transformer.Source, Config: transformer.Config, }, }, emailColInfo, false) @@ -1199,7 +1200,6 @@ func Test_computeMutationFunction_Validate_Bloblang_Output_EmptyConfigs(t *testi &mgmtv1alpha1.JobMapping{ Column: "email", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: transformer.Source, Config: transformer.Config, }, }, emailColInfo, false) @@ -1220,7 +1220,6 @@ func Test_computeMutationFunction_handles_Db_Maxlen(t *testing.T) { } jm := &mgmtv1alpha1.JobMapping{ Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_RANDOM_STRING, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateStringConfig{ GenerateStringConfig: &mgmtv1alpha1.GenerateString{ diff --git a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/processors.go b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/processors.go index 7b3667ad69..69d4059d1f 100644 --- a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/processors.go +++ b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/processors.go @@ -167,26 +167,27 @@ func extractJsFunctionsAndOutputs(ctx context.Context, transformerclient mgmtv1a var jsFunctions []string for _, col := range cols { - if shouldProcessStrict(col.Transformer) { - if _, ok := col.Transformer.Config.Config.(*mgmtv1alpha1.TransformerConfig_UserDefinedTransformerConfig); ok { - val, err := convertUserDefinedFunctionConfig(ctx, transformerclient, col.Transformer) + jmTransformer := col.GetTransformer() + if shouldProcessStrict(jmTransformer) { + if jmTransformer.GetConfig().GetUserDefinedTransformerConfig() != nil { + val, err := convertUserDefinedFunctionConfig(ctx, transformerclient, col.GetTransformer()) if err != nil { return "", errors.New("unable to look up user defined transformer config by id") } - col.Transformer = val + jmTransformer = val } - switch col.Transformer.Source { - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_JAVASCRIPT: - code := col.Transformer.Config.GetTransformJavascriptConfig().Code + switch cfg := jmTransformer.GetConfig().GetConfig().(type) { + case *mgmtv1alpha1.TransformerConfig_TransformJavascriptConfig: + code := cfg.TransformJavascriptConfig.GetCode() if code != "" { - jsFunctions = append(jsFunctions, constructJsFunction(code, col.Column, col.Transformer.Source)) - benthosOutputs = append(benthosOutputs, constructBenthosJavascriptObject(col.Column, col.Transformer.Source)) + jsFunctions = append(jsFunctions, constructJsFunction(code, col.Column, mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_JAVASCRIPT)) + benthosOutputs = append(benthosOutputs, constructBenthosJavascriptObject(col.Column, mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_JAVASCRIPT)) } - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_JAVASCRIPT: - code := col.Transformer.Config.GetGenerateJavascriptConfig().Code + case *mgmtv1alpha1.TransformerConfig_GenerateJavascriptConfig: + code := cfg.GenerateJavascriptConfig.GetCode() if code != "" { - jsFunctions = append(jsFunctions, constructJsFunction(code, col.Column, col.Transformer.Source)) - benthosOutputs = append(benthosOutputs, constructBenthosJavascriptObject(col.Column, col.Transformer.Source)) + jsFunctions = append(jsFunctions, constructJsFunction(code, col.Column, mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_JAVASCRIPT)) + benthosOutputs = append(benthosOutputs, constructBenthosJavascriptObject(col.Column, mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_JAVASCRIPT)) } } } @@ -199,6 +200,16 @@ func extractJsFunctionsAndOutputs(ctx context.Context, transformerclient mgmtv1a } } +// Checks if it is a gen or transform javascript +func isJavascriptTransformer(jmt *mgmtv1alpha1.JobMappingTransformer) bool { + if jmt == nil { + return false + } + isSource := jmt.GetSource() == mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_JAVASCRIPT || jmt.GetSource() == mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_JAVASCRIPT + isConfig := jmt.GetConfig().GetTransformJavascriptConfig() != nil || jmt.GetConfig().GetGenerateJavascriptConfig() != nil + return isSource || isConfig +} + func buildMutationConfigs( ctx context.Context, transformerclient mgmtv1alpha1connect.TransformersServiceClient, @@ -209,22 +220,22 @@ func buildMutationConfigs( mutations := []string{} for _, col := range cols { - colInfo := tableColumnInfo[col.Column] - if shouldProcessColumn(col.Transformer) { - if _, ok := col.Transformer.Config.Config.(*mgmtv1alpha1.TransformerConfig_UserDefinedTransformerConfig); ok { + colInfo := tableColumnInfo[col.GetColumn()] + if shouldProcessColumn(col.GetTransformer()) { + if col.GetTransformer().GetConfig().GetUserDefinedTransformerConfig() != nil { // handle user defined transformer -> get the user defined transformer configs using the id - val, err := convertUserDefinedFunctionConfig(ctx, transformerclient, col.Transformer) + val, err := convertUserDefinedFunctionConfig(ctx, transformerclient, col.GetTransformer()) if err != nil { return "", errors.New("unable to look up user defined transformer config by id") } col.Transformer = val } - if col.Transformer.Source != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_JAVASCRIPT && col.Transformer.Source != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_JAVASCRIPT { + if isJavascriptTransformer(col.GetTransformer()) { mutation, err := computeMutationFunction(col, colInfo, splitColumnPaths) if err != nil { - return "", fmt.Errorf("%s is not a supported transformer: %w", col.Transformer, err) + return "", fmt.Errorf("%s is not a supported transformer: %w", col.GetTransformer(), err) } - mutations = append(mutations, fmt.Sprintf("root.%s = %s", getBenthosColumnKey(col.Column, splitColumnPaths), mutation)) + mutations = append(mutations, fmt.Sprintf("root.%s = %s", getBenthosColumnKey(col.GetColumn(), splitColumnPaths), mutation)) } } } @@ -409,14 +420,15 @@ func convertUserDefinedFunctionConfig( transformerclient mgmtv1alpha1connect.TransformersServiceClient, t *mgmtv1alpha1.JobMappingTransformer, ) (*mgmtv1alpha1.JobMappingTransformer, error) { - transformer, err := transformerclient.GetUserDefinedTransformerById(ctx, connect.NewRequest(&mgmtv1alpha1.GetUserDefinedTransformerByIdRequest{TransformerId: t.Config.GetUserDefinedTransformerConfig().Id})) + transformerResp, err := transformerclient.GetUserDefinedTransformerById(ctx, connect.NewRequest(&mgmtv1alpha1.GetUserDefinedTransformerByIdRequest{TransformerId: t.Config.GetUserDefinedTransformerConfig().Id})) if err != nil { return nil, err } + transformer := transformerResp.Msg.GetTransformer() return &mgmtv1alpha1.JobMappingTransformer{ - Source: transformer.Msg.Transformer.Source, - Config: transformer.Msg.Transformer.Config, + Source: transformer.GetSource(), + Config: transformer.GetConfig(), }, nil } @@ -434,56 +446,56 @@ func computeMutationFunction(col *mgmtv1alpha1.JobMapping, colInfo *sqlmanager_s formattedColPath := getBenthosColumnKey(col.Column, splitColumnPath) config := col.GetTransformer().GetConfig() - switch col.Transformer.Source { - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_CATEGORICAL: + switch config.GetConfig().(type) { + case *mgmtv1alpha1.TransformerConfig_GenerateCategoricalConfig: opts, err := transformers.NewGenerateCategoricalOptsFromConfig(config.GetGenerateCategoricalConfig()) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_EMAIL: + case *mgmtv1alpha1.TransformerConfig_GenerateEmailConfig: opts, err := transformers.NewGenerateEmailOptsFromConfig(config.GetGenerateEmailConfig(), &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_EMAIL: + case *mgmtv1alpha1.TransformerConfig_TransformEmailConfig: opts, err := transformers.NewTransformEmailOptsFromConfig(config.GetTransformEmailConfig(), &maxLen) if err != nil { return "", nil } return opts.BuildBloblangString(formattedColPath), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_BOOL: + case *mgmtv1alpha1.TransformerConfig_GenerateBoolConfig: opts, err := transformers.NewGenerateBoolOptsFromConfig(config.GetGenerateBoolConfig()) if err != nil { return "", nil } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_CARD_NUMBER: + case *mgmtv1alpha1.TransformerConfig_GenerateCardNumberConfig: opts, err := transformers.NewGenerateCardNumberOptsFromConfig(config.GetGenerateCardNumberConfig()) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_CITY: + case *mgmtv1alpha1.TransformerConfig_GenerateCityConfig: opts, err := transformers.NewGenerateCityOptsFromConfig(config.GetGenerateCityConfig(), &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_E164_PHONE_NUMBER: + case *mgmtv1alpha1.TransformerConfig_GenerateE164PhoneNumberConfig: opts, err := transformers.NewGenerateInternationalPhoneNumberOptsFromConfig(config.GetGenerateE164PhoneNumberConfig()) if err != nil { return "", nil } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_FIRST_NAME: + case *mgmtv1alpha1.TransformerConfig_GenerateFirstNameConfig: opts, err := transformers.NewGenerateFirstNameOptsFromConfig(config.GetGenerateFirstNameConfig(), &maxLen) if err != nil { return "", nil } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_FLOAT64: + case *mgmtv1alpha1.TransformerConfig_GenerateFloat64Config: var precision *int64 if config != nil && config.GetGenerateFloat64Config() != nil && config.GetGenerateFloat64Config().GetPrecision() > 0 { userDefinedPrecision := config.GetGenerateFloat64Config().GetPrecision() @@ -509,122 +521,122 @@ func computeMutationFunction(col *mgmtv1alpha1.JobMapping, colInfo *sqlmanager_s return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_FULL_ADDRESS: + case *mgmtv1alpha1.TransformerConfig_GenerateFullAddressConfig: opts, err := transformers.NewGenerateFullAddressOptsFromConfig(config.GetGenerateFullAddressConfig(), &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_FULL_NAME: + case *mgmtv1alpha1.TransformerConfig_GenerateFullNameConfig: opts, err := transformers.NewGenerateFullNameOptsFromConfig(config.GetGenerateFullNameConfig(), &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_GENDER: + case *mgmtv1alpha1.TransformerConfig_GenerateGenderConfig: opts, err := transformers.NewGenerateGenderOptsFromConfig(config.GetGenerateGenderConfig(), &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_INT64_PHONE_NUMBER: + case *mgmtv1alpha1.TransformerConfig_GenerateInt64PhoneNumberConfig: opts, err := transformers.NewGenerateInt64PhoneNumberOptsFromConfig(config.GetGenerateInt64PhoneNumberConfig()) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_INT64: + case *mgmtv1alpha1.TransformerConfig_GenerateInt64Config: opts, err := transformers.NewGenerateInt64OptsFromConfig(config.GetGenerateInt64Config()) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_LAST_NAME: + case *mgmtv1alpha1.TransformerConfig_GenerateLastNameConfig: opts, err := transformers.NewGenerateLastNameOptsFromConfig(config.GetGenerateLastNameConfig(), &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_SHA256HASH: + case *mgmtv1alpha1.TransformerConfig_GenerateSha256HashConfig: opts, err := transformers.NewGenerateSHA256HashOptsFromConfig(config.GetGenerateSha256HashConfig()) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_SSN: + case *mgmtv1alpha1.TransformerConfig_GenerateSsnConfig: opts, err := transformers.NewGenerateSSNOptsFromConfig(config.GetGenerateSsnConfig()) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_STATE: + case *mgmtv1alpha1.TransformerConfig_GenerateStateConfig: opts, err := transformers.NewGenerateStateOptsFromConfig(config.GetGenerateStateConfig()) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_STREET_ADDRESS: + case *mgmtv1alpha1.TransformerConfig_GenerateStreetAddressConfig: opts, err := transformers.NewGenerateStreetAddressOptsFromConfig(config.GetGenerateStreetAddressConfig(), &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_STRING_PHONE_NUMBER: + case *mgmtv1alpha1.TransformerConfig_GenerateStringPhoneNumberConfig: opts, err := transformers.NewGenerateStringPhoneNumberOptsFromConfig(config.GetGenerateStringPhoneNumberConfig()) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_RANDOM_STRING: + case *mgmtv1alpha1.TransformerConfig_GenerateStringConfig: // todo: we need to pull in the min from the database schema opts, err := transformers.NewGenerateRandomStringOptsFromConfig(config.GetGenerateStringConfig(), &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_UNIXTIMESTAMP: + case *mgmtv1alpha1.TransformerConfig_GenerateUnixtimestampConfig: opts, err := transformers.NewGenerateUnixTimestampOptsFromConfig(config.GetGenerateUnixtimestampConfig()) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_USERNAME: + case *mgmtv1alpha1.TransformerConfig_GenerateUsernameConfig: opts, err := transformers.NewGenerateUsernameOptsFromConfig(config.GetGenerateUsernameConfig(), &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_UTCTIMESTAMP: + case *mgmtv1alpha1.TransformerConfig_GenerateUtctimestampConfig: opts, err := transformers.NewGenerateUTCTimestampOptsFromConfig(config.GetGenerateUtctimestampConfig()) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_UUID: + case *mgmtv1alpha1.TransformerConfig_GenerateUuidConfig: opts, err := transformers.NewGenerateUUIDOptsFromConfig(config.GetGenerateUuidConfig()) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_ZIPCODE: + case *mgmtv1alpha1.TransformerConfig_GenerateZipcodeConfig: opts, err := transformers.NewGenerateZipcodeOptsFromConfig(config.GetGenerateZipcodeConfig()) if err != nil { return "", err } return opts.BuildBloblangString(), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_E164_PHONE_NUMBER: + case *mgmtv1alpha1.TransformerConfig_TransformE164PhoneNumberConfig: opts, err := transformers.NewTransformE164PhoneNumberOptsFromConfig(config.GetTransformE164PhoneNumberConfig(), &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(formattedColPath), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_FIRST_NAME: + case *mgmtv1alpha1.TransformerConfig_TransformFirstNameConfig: opts, err := transformers.NewTransformFirstNameOptsFromConfig(config.GetTransformFirstNameConfig(), &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(formattedColPath), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_FLOAT64: + case *mgmtv1alpha1.TransformerConfig_TransformFloat64Config: var precision *int64 if colInfo != nil && colInfo.NumericPrecision != nil && *colInfo.NumericPrecision > 0 { newPrecision := int64(*colInfo.NumericPrecision) @@ -640,54 +652,54 @@ func computeMutationFunction(col *mgmtv1alpha1.JobMapping, colInfo *sqlmanager_s return "", err } return opts.BuildBloblangString(formattedColPath), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_FULL_NAME: + case *mgmtv1alpha1.TransformerConfig_TransformFullNameConfig: opts, err := transformers.NewTransformFullNameOptsFromConfig(config.GetTransformFullNameConfig(), &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(formattedColPath), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_INT64_PHONE_NUMBER: + case *mgmtv1alpha1.TransformerConfig_TransformInt64PhoneNumberConfig: opts, err := transformers.NewTransformInt64PhoneNumberOptsFromConfig(config.GetTransformInt64PhoneNumberConfig()) if err != nil { return "", err } return opts.BuildBloblangString(formattedColPath), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_INT64: + case *mgmtv1alpha1.TransformerConfig_TransformInt64Config: opts, err := transformers.NewTransformInt64OptsFromConfig(config.GetTransformInt64Config()) if err != nil { return "", err } return opts.BuildBloblangString(formattedColPath), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_LAST_NAME: + case *mgmtv1alpha1.TransformerConfig_TransformLastNameConfig: opts, err := transformers.NewTransformLastNameOptsFromConfig(config.GetTransformLastNameConfig(), &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(formattedColPath), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_PHONE_NUMBER: + case *mgmtv1alpha1.TransformerConfig_TransformPhoneNumberConfig: opts, err := transformers.NewTransformStringPhoneNumberOptsFromConfig(config.GetTransformPhoneNumberConfig(), &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(formattedColPath), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_STRING: + case *mgmtv1alpha1.TransformerConfig_TransformStringConfig: minLength := int64(3) // todo: we need to pull in this value from the database schema opts, err := transformers.NewTransformStringOptsFromConfig(config.GetTransformStringConfig(), &minLength, &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(formattedColPath), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_NULL: + case *mgmtv1alpha1.TransformerConfig_Nullconfig: return shared.NullString, nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT: + case *mgmtv1alpha1.TransformerConfig_GenerateDefaultConfig: return `"DEFAULT"`, nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_CHARACTER_SCRAMBLE: + case *mgmtv1alpha1.TransformerConfig_TransformCharacterScrambleConfig: opts, err := transformers.NewTransformCharacterScrambleOptsFromConfig(config.GetTransformCharacterScrambleConfig()) if err != nil { return "", err } return opts.BuildBloblangString(formattedColPath), nil - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_COUNTRY: + case *mgmtv1alpha1.TransformerConfig_GenerateCountryConfig: opts, err := transformers.NewGenerateCountryOptsFromConfig(config.GetGenerateCountryConfig()) if err != nil { return "", err diff --git a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/processors_test.go b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/processors_test.go index 4dd9f1d183..41e556e8d1 100644 --- a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/processors_test.go +++ b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/processors_test.go @@ -63,7 +63,7 @@ func Test_buildProcessorConfigsJavascript(t *testing.T) { []*mgmtv1alpha1.JobMapping{ { Schema: "public", Table: "users", Column: "address", - Transformer: &mgmtv1alpha1.JobMappingTransformer{Source: jsT.Source, Config: jsT.Config}, + Transformer: &mgmtv1alpha1.JobMappingTransformer{Config: jsT.Config}, }}, map[string]*sqlmanager_shared.ColumnInfo{}, map[string][]*referenceKey{}, []string{}, mockJobId, mockRunId, nil, @@ -123,7 +123,7 @@ func Test_buildProcessorConfigsGenerateJavascript(t *testing.T) { ctx, mockTransformerClient, []*mgmtv1alpha1.JobMapping{ {Schema: "public", Table: "users", Column: "test", - Transformer: &mgmtv1alpha1.JobMappingTransformer{Source: jsT.Source, Config: jsT.Config}, + Transformer: &mgmtv1alpha1.JobMappingTransformer{Config: jsT.Config}, }}, map[string]*sqlmanager_shared.ColumnInfo{}, map[string][]*referenceKey{}, []string{}, mockJobId, mockRunId, nil, @@ -194,8 +194,8 @@ func Test_buildProcessorConfigsJavascriptMultiple(t *testing.T) { res, err := buildProcessorConfigs( ctx, mockTransformerClient, []*mgmtv1alpha1.JobMapping{ - {Schema: "public", Table: "users", Column: nameCol, Transformer: &mgmtv1alpha1.JobMappingTransformer{Source: jsT.Source, Config: jsT.Config}}, - {Schema: "public", Table: "users", Column: ageCol, Transformer: &mgmtv1alpha1.JobMappingTransformer{Source: jsT2.Source, Config: jsT2.Config}}}, + {Schema: "public", Table: "users", Column: nameCol, Transformer: &mgmtv1alpha1.JobMappingTransformer{Config: jsT.Config}}, + {Schema: "public", Table: "users", Column: ageCol, Transformer: &mgmtv1alpha1.JobMappingTransformer{Config: jsT2.Config}}}, map[string]*sqlmanager_shared.ColumnInfo{}, map[string][]*referenceKey{}, []string{}, mockJobId, mockRunId, nil, tabledependency.NewRunConfig("", tabledependency.RunTypeInsert, nil, nil, nil, []string{nameCol, ageCol}, nil, false), nil, @@ -265,8 +265,8 @@ func Test_buildProcessorConfigsTransformAndGenerateJavascript(t *testing.T) { res, err := buildProcessorConfigs( ctx, mockTransformerClient, []*mgmtv1alpha1.JobMapping{ - {Schema: "public", Table: "users", Column: nameCol, Transformer: &mgmtv1alpha1.JobMappingTransformer{Source: jsT.Source, Config: jsT.Config}}, - {Schema: "public", Table: "users", Column: col2, Transformer: &mgmtv1alpha1.JobMappingTransformer{Source: jsT2.Source, Config: jsT2.Config}}}, + {Schema: "public", Table: "users", Column: nameCol, Transformer: &mgmtv1alpha1.JobMappingTransformer{Config: jsT.Config}}, + {Schema: "public", Table: "users", Column: col2, Transformer: &mgmtv1alpha1.JobMappingTransformer{Config: jsT2.Config}}}, map[string]*sqlmanager_shared.ColumnInfo{}, map[string][]*referenceKey{}, []string{}, mockJobId, mockRunId, nil, tabledependency.NewRunConfig("", tabledependency.RunTypeInsert, nil, nil, nil, []string{nameCol, col2}, nil, false), nil, @@ -325,7 +325,7 @@ func Test_buildProcessorConfigsJavascript_DeepKeys(t *testing.T) { []*mgmtv1alpha1.JobMapping{ { Schema: "public", Table: "users", Column: "foo.bar.baz", - Transformer: &mgmtv1alpha1.JobMappingTransformer{Source: jsT.Source, Config: jsT.Config}, + Transformer: &mgmtv1alpha1.JobMappingTransformer{Config: jsT.Config}, }}, map[string]*sqlmanager_shared.ColumnInfo{}, map[string][]*referenceKey{}, []string{}, mockJobId, mockRunId, nil, diff --git a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/sync.go b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/sync.go index 400fc812cd..10d6969f37 100644 --- a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/sync.go +++ b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/sync.go @@ -134,7 +134,7 @@ func filterForeignKeysMap( } for i, c := range fk.Columns { t, ok := cols[c] - if !fk.NotNullable[i] && (!ok || t.GetSource() == mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_NULL) { + if !fk.NotNullable[i] && (!ok || isNullJobMappingTransformer(t)) { continue } @@ -151,6 +151,24 @@ func filterForeignKeysMap( return newFkMap } +func isNullJobMappingTransformer(t *mgmtv1alpha1.JobMappingTransformer) bool { + if t == nil { + return false + } + isNullSource := t.GetSource() == mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_NULL + isNullConfig := t.GetConfig().GetNullconfig() + return isNullSource || isNullConfig +} + +func isDefaultJobMappingTransformer(t *mgmtv1alpha1.JobMappingTransformer) bool { + if t == nil { + return false + } + isDefSource := t.GetSource() == mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT + isDefConfig := t.GetConfig().GetGenerateDefaultConfig() + return isDefSource || isDefConfig +} + func mergeVirtualForeignKeys( dbForeignKeys map[string][]*sqlmanager_shared.ForeignConstraint, virtualForeignKeys []*mgmtv1alpha1.VirtualForeignConstraint, @@ -357,12 +375,12 @@ func getColumnDefaultProperties( return nil, err } - transformer, ok := colTransformers[cName] + jmTransformer, ok := colTransformers[cName] if !ok { return nil, fmt.Errorf("transformer missing for column: %s", cName) } var hasDefaultTransformer bool - if transformer != nil && transformer.Source == mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT { + if jmTransformer != nil && isDefaultJobMappingTransformer(jmTransformer) { hasDefaultTransformer = true } if !needsReset && !needsOverride && !hasDefaultTransformer { diff --git a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/sync_test.go b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/sync_test.go index e0230529fd..43ea54770b 100644 --- a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/sync_test.go +++ b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/sync_test.go @@ -45,9 +45,9 @@ func TestFilterForeignKeysMap(t *testing.T) { name: "Filtered composite foreign keys", colTransformerMap: map[string]map[string]*mgmtv1alpha1.JobMappingTransformer{ "table1": { - "col1": &mgmtv1alpha1.JobMappingTransformer{Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_NULL}, - "col2": &mgmtv1alpha1.JobMappingTransformer{Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_NULL}, - "col3": &mgmtv1alpha1.JobMappingTransformer{Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_NULL}, + "col1": &mgmtv1alpha1.JobMappingTransformer{Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_Nullconfig{}}}, + "col2": &mgmtv1alpha1.JobMappingTransformer{Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_Nullconfig{}}}, + "col3": &mgmtv1alpha1.JobMappingTransformer{Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_Nullconfig{}}}, }, }, foreignKeysMap: map[string][]*sqlmanager_shared.ForeignConstraint{ @@ -73,10 +73,10 @@ func TestFilterForeignKeysMap(t *testing.T) { name: "Filtered foreign keys", colTransformerMap: map[string]map[string]*mgmtv1alpha1.JobMappingTransformer{ "table1": { - "col1": &mgmtv1alpha1.JobMappingTransformer{Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH}, + "col1": &mgmtv1alpha1.JobMappingTransformer{Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}}}, }, "table2": { - "col2": &mgmtv1alpha1.JobMappingTransformer{Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_NULL}, + "col2": &mgmtv1alpha1.JobMappingTransformer{Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_Nullconfig{}}}, }, }, foreignKeysMap: map[string][]*sqlmanager_shared.ForeignConstraint{ diff --git a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/utils.go b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/utils.go index 5da2ccc3ac..b4094d8da4 100644 --- a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/utils.go +++ b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/utils.go @@ -35,15 +35,31 @@ func buildPlainColumns(mappings []*mgmtv1alpha1.JobMapping) []string { } func shouldProcessColumn(t *mgmtv1alpha1.JobMappingTransformer) bool { - return t != nil && - t.Source != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_UNSPECIFIED && - t.Source != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH + if t == nil { + return false + } + + source := + t.GetSource() != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_UNSPECIFIED && + t.GetSource() != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH + config := t.GetConfig().GetConfig() + passthrough := t.GetConfig().GetPassthroughConfig() + return source || (config != nil && passthrough == nil) } func shouldProcessStrict(t *mgmtv1alpha1.JobMappingTransformer) bool { - return t != nil && - t.Source != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_UNSPECIFIED && - t.Source != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_NULL && - t.Source != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH && - t.Source != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT + if t == nil { + return false + } + + source := + t.GetSource() != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_UNSPECIFIED && + t.GetSource() != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_NULL && + t.GetSource() != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH && + t.GetSource() != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT + config := t.GetConfig().GetConfig() + genNull := t.GetConfig().GetNullconfig() + passthrough := t.GetConfig().GetPassthroughConfig() + genDefault := t.GetConfig().GetGenerateDefaultConfig() + return source || (config != nil && genNull == nil && passthrough == nil && genDefault == nil) } diff --git a/worker/pkg/workflows/datasync/activities/run-sql-init-table-stmts/init-statement-builder_test.go b/worker/pkg/workflows/datasync/activities/run-sql-init-table-stmts/init-statement-builder_test.go index e73d480a5a..19a2025297 100644 --- a/worker/pkg/workflows/datasync/activities/run-sql-init-table-stmts/init-statement-builder_test.go +++ b/worker/pkg/workflows/datasync/activities/run-sql-init-table-stmts/init-statement-builder_test.go @@ -53,7 +53,6 @@ func Test_InitStatementBuilder_Pg_Generate_InitSchema(t *testing.T) { Table: "users", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_UUID, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUuidConfig{ GenerateUuidConfig: &mgmtv1alpha1.GenerateUuid{ @@ -68,7 +67,6 @@ func Test_InitStatementBuilder_Pg_Generate_InitSchema(t *testing.T) { Table: "users", Column: "name", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_FULL_NAME, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateFullNameConfig{ GenerateFullNameConfig: &mgmtv1alpha1.GenerateFullName{}, @@ -201,7 +199,6 @@ func Test_InitStatementBuilder_Pg_Generate_NoInitStatement(t *testing.T) { Table: "users", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_UUID, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUuidConfig{ GenerateUuidConfig: &mgmtv1alpha1.GenerateUuid{ @@ -216,7 +213,6 @@ func Test_InitStatementBuilder_Pg_Generate_NoInitStatement(t *testing.T) { Table: "users", Column: "name", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_FULL_NAME, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateFullNameConfig{ GenerateFullNameConfig: &mgmtv1alpha1.GenerateFullName{}, @@ -304,7 +300,6 @@ func Test_InitStatementBuilder_Pg_TruncateCascade(t *testing.T) { Table: "users", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_UUID, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUuidConfig{ GenerateUuidConfig: &mgmtv1alpha1.GenerateUuid{ @@ -319,7 +314,6 @@ func Test_InitStatementBuilder_Pg_TruncateCascade(t *testing.T) { Table: "users", Column: "name", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_FULL_NAME, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateFullNameConfig{ GenerateFullNameConfig: &mgmtv1alpha1.GenerateFullName{}, @@ -332,7 +326,6 @@ func Test_InitStatementBuilder_Pg_TruncateCascade(t *testing.T) { Table: "accounts", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_UUID, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUuidConfig{ GenerateUuidConfig: &mgmtv1alpha1.GenerateUuid{ @@ -436,7 +429,6 @@ func Test_InitStatementBuilder_Pg_Truncate(t *testing.T) { Table: "users", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_UUID, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUuidConfig{ GenerateUuidConfig: &mgmtv1alpha1.GenerateUuid{ @@ -451,7 +443,6 @@ func Test_InitStatementBuilder_Pg_Truncate(t *testing.T) { Table: "users", Column: "name", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_FULL_NAME, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateFullNameConfig{ GenerateFullNameConfig: &mgmtv1alpha1.GenerateFullName{}, @@ -464,7 +455,6 @@ func Test_InitStatementBuilder_Pg_Truncate(t *testing.T) { Table: "accounts", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_UUID, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUuidConfig{ GenerateUuidConfig: &mgmtv1alpha1.GenerateUuid{ @@ -576,7 +566,6 @@ func Test_InitStatementBuilder_Pg_InitSchema(t *testing.T) { Table: "users", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_UUID, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUuidConfig{ GenerateUuidConfig: &mgmtv1alpha1.GenerateUuid{ @@ -591,7 +580,6 @@ func Test_InitStatementBuilder_Pg_InitSchema(t *testing.T) { Table: "users", Column: "name", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_FULL_NAME, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateFullNameConfig{ GenerateFullNameConfig: &mgmtv1alpha1.GenerateFullName{}, @@ -604,7 +592,6 @@ func Test_InitStatementBuilder_Pg_InitSchema(t *testing.T) { Table: "accounts", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_UUID, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUuidConfig{ GenerateUuidConfig: &mgmtv1alpha1.GenerateUuid{ @@ -719,7 +706,6 @@ func Test_InitStatementBuilder_Mysql_Generate(t *testing.T) { Table: "users", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_UUID, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUuidConfig{ GenerateUuidConfig: &mgmtv1alpha1.GenerateUuid{ @@ -734,7 +720,6 @@ func Test_InitStatementBuilder_Mysql_Generate(t *testing.T) { Table: "users", Column: "name", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_FULL_NAME, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateFullNameConfig{ GenerateFullNameConfig: &mgmtv1alpha1.GenerateFullName{}, diff --git a/worker/pkg/workflows/datasync/workflow/workflow_integration_test.go b/worker/pkg/workflows/datasync/workflow/workflow_integration_test.go index 5cf2cf2c73..c2e2e005f6 100644 --- a/worker/pkg/workflows/datasync/workflow/workflow_integration_test.go +++ b/worker/pkg/workflows/datasync/workflow/workflow_integration_test.go @@ -464,7 +464,6 @@ func (s *IntegrationTestSuite) Test_Workflow_VirtualForeignKeys_Transform() { for _, m := range jobmappings { if m.Table == "countries" && m.Column == "country_id" { m.Transformer = &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_JAVASCRIPT, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformJavascriptConfig{ TransformJavascriptConfig: &mgmtv1alpha1.TransformJavascript{Code: `if (value == 'US') { return 'SU'; } return value;`}, @@ -989,7 +988,6 @@ func getAllDynamoDBSyncTests() map[string][]*workflow_testdata.IntegrationTest { Table: "test-sync-source", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}, }, @@ -1000,7 +998,6 @@ func getAllDynamoDBSyncTests() map[string][]*workflow_testdata.IntegrationTest { Table: "test-sync-source", Column: "a", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}, }, @@ -1021,7 +1018,6 @@ func getAllDynamoDBSyncTests() map[string][]*workflow_testdata.IntegrationTest { Table: "test-sync-source", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}, }, @@ -1032,7 +1028,6 @@ func getAllDynamoDBSyncTests() map[string][]*workflow_testdata.IntegrationTest { Table: "test-sync-source", Column: "a", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}, }, @@ -1055,7 +1050,6 @@ func getAllDynamoDBSyncTests() map[string][]*workflow_testdata.IntegrationTest { Table: "test-sync-source", Column: "id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}, }, @@ -1066,7 +1060,6 @@ func getAllDynamoDBSyncTests() map[string][]*workflow_testdata.IntegrationTest { Table: "test-sync-source", Column: "a", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}, }, @@ -1076,13 +1069,11 @@ func getAllDynamoDBSyncTests() map[string][]*workflow_testdata.IntegrationTest { JobOptions: &workflow_testdata.TestJobOptions{ DefaultTransformers: &workflow_testdata.DefaultTransformers{ Boolean: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_BOOL, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateBoolConfig{}, }, }, Number: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_INT64, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformInt64Config{ TransformInt64Config: &mgmtv1alpha1.TransformInt64{RandomizationRangeMin: gotypeutil.ToPtr(int64(10)), RandomizationRangeMax: gotypeutil.ToPtr(int64(1000))}, @@ -1090,7 +1081,6 @@ func getAllDynamoDBSyncTests() map[string][]*workflow_testdata.IntegrationTest { }, }, String: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_STRING, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformStringConfig{ TransformStringConfig: &mgmtv1alpha1.TransformString{}, @@ -1098,8 +1088,9 @@ func getAllDynamoDBSyncTests() map[string][]*workflow_testdata.IntegrationTest { }, }, Byte: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{ + Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}, + }, }, }, }, @@ -1287,7 +1278,6 @@ func getAllMongoDBSyncTests() map[string][]*workflow_testdata.IntegrationTest { Table: "test-sync", Column: "string", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}, }, @@ -1298,7 +1288,6 @@ func getAllMongoDBSyncTests() map[string][]*workflow_testdata.IntegrationTest { Table: "test-sync", Column: "bool", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{}, }, @@ -1318,7 +1307,6 @@ func getAllMongoDBSyncTests() map[string][]*workflow_testdata.IntegrationTest { Table: "test-sync", Column: "string", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_STRING, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformStringConfig{ TransformStringConfig: &mgmtv1alpha1.TransformString{ @@ -1333,7 +1321,6 @@ func getAllMongoDBSyncTests() map[string][]*workflow_testdata.IntegrationTest { Table: "test-sync", Column: "embedded_document.name", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_FIRST_NAME, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateFirstNameConfig{ GenerateFirstNameConfig: &mgmtv1alpha1.GenerateFirstName{}, @@ -1346,7 +1333,6 @@ func getAllMongoDBSyncTests() map[string][]*workflow_testdata.IntegrationTest { Table: "test-sync", Column: "decimal128", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_FLOAT64, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformFloat64Config{ TransformFloat64Config: &mgmtv1alpha1.TransformFloat64{ @@ -1362,7 +1348,6 @@ func getAllMongoDBSyncTests() map[string][]*workflow_testdata.IntegrationTest { Table: "test-sync", Column: "int64", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_INT64, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformInt64Config{ TransformInt64Config: &mgmtv1alpha1.TransformInt64{ @@ -1378,7 +1363,6 @@ func getAllMongoDBSyncTests() map[string][]*workflow_testdata.IntegrationTest { Table: "test-sync", Column: "timestamp", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_UNIXTIMESTAMP, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUnixtimestampConfig{ GenerateUnixtimestampConfig: &mgmtv1alpha1.GenerateUnixTimestamp{}, @@ -1453,11 +1437,9 @@ func (s *IntegrationTestSuite) Test_Workflow_Generate() { }, Mappings: []*mgmtv1alpha1.JobMapping{ {Schema: schema, Table: table, Column: "region_id", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT, - Config: &mgmtv1alpha1.TransformerConfig{}, + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_GenerateDefaultConfig{}}, }}, {Schema: schema, Table: table, Column: "region_name", Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_CITY, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateCityConfig{ GenerateCityConfig: &mgmtv1alpha1.GenerateCity{}, From 1366f5eb3eb61acca2a2bfb6f92a1affc1d0770e Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Fri, 25 Oct 2024 15:29:57 -0700 Subject: [PATCH 2/6] cleanup --- backend/gen/go/db/models.go | 2 +- .../gen-benthos-configs/benthos-builder_test.go | 9 --------- .../datasync/activities/gen-benthos-configs/sync.go | 4 ++-- 3 files changed, 3 insertions(+), 12 deletions(-) diff --git a/backend/gen/go/db/models.go b/backend/gen/go/db/models.go index 9352a6e23f..56c5af8ad9 100644 --- a/backend/gen/go/db/models.go +++ b/backend/gen/go/db/models.go @@ -109,7 +109,7 @@ type NeosyncApiTransformer struct { Name string Description string AccountID pgtype.UUID - TransformerConfig *pg_models.TransformerConfigs + TransformerConfig *pg_models.TransformerConfig CreatedByID pgtype.UUID UpdatedByID pgtype.UUID Source int32 diff --git a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder_test.go b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder_test.go index 9c0842b54f..d1b6b34d4c 100644 --- a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder_test.go +++ b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder_test.go @@ -8,14 +8,12 @@ import ( "testing" "connectrpc.com/connect" - db_queries "github.com/nucleuscloud/neosync/backend/gen/go/db" mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect" sqlmanager_mssql "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/mssql" sqlmanager_postgres "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/postgres" sqlmanager_shared "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/shared" tabledependency "github.com/nucleuscloud/neosync/backend/pkg/table-dependency" - pg_models "github.com/nucleuscloud/neosync/backend/sql/postgresql/models" "github.com/nucleuscloud/neosync/internal/gotypeutil" "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared" "github.com/stretchr/testify/mock" @@ -516,13 +514,6 @@ func Test_convertUserDefinedFunctionConfig(t *testing.T) { require.Equal(t, resp, expected) } -func MockJobMappingTransformer(source int32, transformerId string) db_queries.NeosyncApiTransformer { - return db_queries.NeosyncApiTransformer{ - Source: source, - TransformerConfig: &pg_models.TransformerConfig{}, - } -} - func Test_buildPlainInsertArgs(t *testing.T) { require.Empty(t, buildPlainInsertArgs(nil)) require.Empty(t, buildPlainInsertArgs([]string{})) diff --git a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/sync.go b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/sync.go index 10d6969f37..1b7d6c93a6 100644 --- a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/sync.go +++ b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/sync.go @@ -156,7 +156,7 @@ func isNullJobMappingTransformer(t *mgmtv1alpha1.JobMappingTransformer) bool { return false } isNullSource := t.GetSource() == mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_NULL - isNullConfig := t.GetConfig().GetNullconfig() + isNullConfig := t.GetConfig().GetNullconfig() != nil return isNullSource || isNullConfig } @@ -165,7 +165,7 @@ func isDefaultJobMappingTransformer(t *mgmtv1alpha1.JobMappingTransformer) bool return false } isDefSource := t.GetSource() == mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT - isDefConfig := t.GetConfig().GetGenerateDefaultConfig() + isDefConfig := t.GetConfig().GetGenerateDefaultConfig() != nil return isDefSource || isDefConfig } From 8cadae9648f31da2b148d03b8a171cbf5158b4df Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Fri, 25 Oct 2024 15:30:49 -0700 Subject: [PATCH 3/6] fixes sqlc generator --- sqlc.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlc.yaml b/sqlc.yaml index a50aa32bce..deb5c67bb2 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -47,7 +47,7 @@ sql: go_type: import: github.com/nucleuscloud/neosync/backend/sql/postgresql/models package: pg_models - type: TransformerConfigs + type: TransformerConfig pointer: true - column: neosync_api.accounts.temporal_config go_type: From a09e4dfafb16c8e3914e2fedf37770df55480c92 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Fri, 25 Oct 2024 15:48:37 -0700 Subject: [PATCH 4/6] fixes todo, makes processor more type safe --- .../jobs/SchemaTable/SchemaTableToolBar.tsx | 10 +-- .../jobs/SchemaTable/transformer-handler.ts | 15 ++-- .../gen-benthos-configs/processors.go | 86 +++++++++---------- 3 files changed, 56 insertions(+), 55 deletions(-) diff --git a/frontend/apps/web/components/jobs/SchemaTable/SchemaTableToolBar.tsx b/frontend/apps/web/components/jobs/SchemaTable/SchemaTableToolBar.tsx index 62b8745e80..303ebcce6a 100644 --- a/frontend/apps/web/components/jobs/SchemaTable/SchemaTableToolBar.tsx +++ b/frontend/apps/web/components/jobs/SchemaTable/SchemaTableToolBar.tsx @@ -328,10 +328,10 @@ function getFilteredTransformersForBulkSet( userDefinedArrays.push(userDefined); }); - const uniqueSystemSources = findCommonSystemSources(systemArrays); - const uniqueSystem = uniqueSystemSources - .map((source) => - transformerHandler.getSystemTransformerByConfigCase(source) + const uniqueSystemConfigCases = findCommonSystemConfigCases(systemArrays); + const uniqueSystem = uniqueSystemConfigCases + .map((configCase) => + transformerHandler.getSystemTransformerByConfigCase(configCase) ) .filter((x): x is SystemTransformer => !!x); @@ -346,7 +346,7 @@ function getFilteredTransformersForBulkSet( }; } -function findCommonSystemSources( +function findCommonSystemConfigCases( arrays: SystemTransformer[][] ): TransformerConfigCase[] { const elementCount: Record = {} as Record< diff --git a/frontend/apps/web/components/jobs/SchemaTable/transformer-handler.ts b/frontend/apps/web/components/jobs/SchemaTable/transformer-handler.ts index ba8934edce..4fa4df67e1 100644 --- a/frontend/apps/web/components/jobs/SchemaTable/transformer-handler.ts +++ b/frontend/apps/web/components/jobs/SchemaTable/transformer-handler.ts @@ -21,6 +21,7 @@ export class TransformerHandler { private readonly userDefinedTransformers: UserDefinedTransformer[]; private readonly systemByType: Map; + // used by user-defined to go from User Defined -> System private readonly systemBySource: Map; private readonly userDefinedById: Map; @@ -31,13 +32,13 @@ export class TransformerHandler { this.systemTransformers = systemTransformers; this.userDefinedTransformers = userDefinedTransformers; - this.systemByType = new Map( - // todo - systemTransformers.map((t) => [ - t.config?.config.case ?? 'generateEmailConfig', - t, - ]) - ); + this.systemByType = new Map(); + systemTransformers.forEach((t) => { + if (t.config?.config.case) { + this.systemByType.set(t.config.config.case, t); + } + }); + this.systemBySource = new Map(systemTransformers.map((t) => [t.source, t])); this.userDefinedById = new Map( userDefinedTransformers.map((t) => [t.id, t]) diff --git a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/processors.go b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/processors.go index 69d4059d1f..cca147cc86 100644 --- a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/processors.go +++ b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/processors.go @@ -446,67 +446,67 @@ func computeMutationFunction(col *mgmtv1alpha1.JobMapping, colInfo *sqlmanager_s formattedColPath := getBenthosColumnKey(col.Column, splitColumnPath) config := col.GetTransformer().GetConfig() - switch config.GetConfig().(type) { + switch cfg := config.GetConfig().(type) { case *mgmtv1alpha1.TransformerConfig_GenerateCategoricalConfig: - opts, err := transformers.NewGenerateCategoricalOptsFromConfig(config.GetGenerateCategoricalConfig()) + opts, err := transformers.NewGenerateCategoricalOptsFromConfig(cfg.GenerateCategoricalConfig) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateEmailConfig: - opts, err := transformers.NewGenerateEmailOptsFromConfig(config.GetGenerateEmailConfig(), &maxLen) + opts, err := transformers.NewGenerateEmailOptsFromConfig(cfg.GenerateEmailConfig, &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_TransformEmailConfig: - opts, err := transformers.NewTransformEmailOptsFromConfig(config.GetTransformEmailConfig(), &maxLen) + opts, err := transformers.NewTransformEmailOptsFromConfig(cfg.TransformEmailConfig, &maxLen) if err != nil { return "", nil } return opts.BuildBloblangString(formattedColPath), nil case *mgmtv1alpha1.TransformerConfig_GenerateBoolConfig: - opts, err := transformers.NewGenerateBoolOptsFromConfig(config.GetGenerateBoolConfig()) + opts, err := transformers.NewGenerateBoolOptsFromConfig(cfg.GenerateBoolConfig) if err != nil { return "", nil } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateCardNumberConfig: - opts, err := transformers.NewGenerateCardNumberOptsFromConfig(config.GetGenerateCardNumberConfig()) + opts, err := transformers.NewGenerateCardNumberOptsFromConfig(cfg.GenerateCardNumberConfig) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateCityConfig: - opts, err := transformers.NewGenerateCityOptsFromConfig(config.GetGenerateCityConfig(), &maxLen) + opts, err := transformers.NewGenerateCityOptsFromConfig(cfg.GenerateCityConfig, &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateE164PhoneNumberConfig: - opts, err := transformers.NewGenerateInternationalPhoneNumberOptsFromConfig(config.GetGenerateE164PhoneNumberConfig()) + opts, err := transformers.NewGenerateInternationalPhoneNumberOptsFromConfig(cfg.GenerateE164PhoneNumberConfig) if err != nil { return "", nil } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateFirstNameConfig: - opts, err := transformers.NewGenerateFirstNameOptsFromConfig(config.GetGenerateFirstNameConfig(), &maxLen) + opts, err := transformers.NewGenerateFirstNameOptsFromConfig(cfg.GenerateFirstNameConfig, &maxLen) if err != nil { return "", nil } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateFloat64Config: var precision *int64 - if config != nil && config.GetGenerateFloat64Config() != nil && config.GetGenerateFloat64Config().GetPrecision() > 0 { - userDefinedPrecision := config.GetGenerateFloat64Config().GetPrecision() + if cfg.GenerateFloat64Config.GetPrecision() > 0 { + userDefinedPrecision := cfg.GenerateFloat64Config.GetPrecision() precision = &userDefinedPrecision - config.GetGenerateFloat64Config().Precision = &userDefinedPrecision + cfg.GenerateFloat64Config.Precision = &userDefinedPrecision } if colInfo != nil && colInfo.NumericPrecision != nil && *colInfo.NumericPrecision > 0 { newPrecision := transformer_utils.Ceil(*precision, int64(*colInfo.NumericPrecision)) precision = &newPrecision } - if config != nil && config.GetGenerateFloat64Config() != nil && precision != nil { + if precision != nil { config.GetGenerateFloat64Config().Precision = precision } @@ -516,122 +516,122 @@ func computeMutationFunction(col *mgmtv1alpha1.JobMapping, colInfo *sqlmanager_s scale = &newScale } - opts, err := transformers.NewGenerateFloat64OptsFromConfig(col.Transformer.Config.GetGenerateFloat64Config(), scale) + opts, err := transformers.NewGenerateFloat64OptsFromConfig(cfg.GenerateFloat64Config, scale) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateFullAddressConfig: - opts, err := transformers.NewGenerateFullAddressOptsFromConfig(config.GetGenerateFullAddressConfig(), &maxLen) + opts, err := transformers.NewGenerateFullAddressOptsFromConfig(cfg.GenerateFullAddressConfig, &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateFullNameConfig: - opts, err := transformers.NewGenerateFullNameOptsFromConfig(config.GetGenerateFullNameConfig(), &maxLen) + opts, err := transformers.NewGenerateFullNameOptsFromConfig(cfg.GenerateFullNameConfig, &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateGenderConfig: - opts, err := transformers.NewGenerateGenderOptsFromConfig(config.GetGenerateGenderConfig(), &maxLen) + opts, err := transformers.NewGenerateGenderOptsFromConfig(cfg.GenerateGenderConfig, &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateInt64PhoneNumberConfig: - opts, err := transformers.NewGenerateInt64PhoneNumberOptsFromConfig(config.GetGenerateInt64PhoneNumberConfig()) + opts, err := transformers.NewGenerateInt64PhoneNumberOptsFromConfig(cfg.GenerateInt64PhoneNumberConfig) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateInt64Config: - opts, err := transformers.NewGenerateInt64OptsFromConfig(config.GetGenerateInt64Config()) + opts, err := transformers.NewGenerateInt64OptsFromConfig(cfg.GenerateInt64Config) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateLastNameConfig: - opts, err := transformers.NewGenerateLastNameOptsFromConfig(config.GetGenerateLastNameConfig(), &maxLen) + opts, err := transformers.NewGenerateLastNameOptsFromConfig(cfg.GenerateLastNameConfig, &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateSha256HashConfig: - opts, err := transformers.NewGenerateSHA256HashOptsFromConfig(config.GetGenerateSha256HashConfig()) + opts, err := transformers.NewGenerateSHA256HashOptsFromConfig(cfg.GenerateSha256HashConfig) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateSsnConfig: - opts, err := transformers.NewGenerateSSNOptsFromConfig(config.GetGenerateSsnConfig()) + opts, err := transformers.NewGenerateSSNOptsFromConfig(cfg.GenerateSsnConfig) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateStateConfig: - opts, err := transformers.NewGenerateStateOptsFromConfig(config.GetGenerateStateConfig()) + opts, err := transformers.NewGenerateStateOptsFromConfig(cfg.GenerateStateConfig) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateStreetAddressConfig: - opts, err := transformers.NewGenerateStreetAddressOptsFromConfig(config.GetGenerateStreetAddressConfig(), &maxLen) + opts, err := transformers.NewGenerateStreetAddressOptsFromConfig(cfg.GenerateStreetAddressConfig, &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateStringPhoneNumberConfig: - opts, err := transformers.NewGenerateStringPhoneNumberOptsFromConfig(config.GetGenerateStringPhoneNumberConfig()) + opts, err := transformers.NewGenerateStringPhoneNumberOptsFromConfig(cfg.GenerateStringPhoneNumberConfig) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateStringConfig: // todo: we need to pull in the min from the database schema - opts, err := transformers.NewGenerateRandomStringOptsFromConfig(config.GetGenerateStringConfig(), &maxLen) + opts, err := transformers.NewGenerateRandomStringOptsFromConfig(cfg.GenerateStringConfig, &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateUnixtimestampConfig: - opts, err := transformers.NewGenerateUnixTimestampOptsFromConfig(config.GetGenerateUnixtimestampConfig()) + opts, err := transformers.NewGenerateUnixTimestampOptsFromConfig(cfg.GenerateUnixtimestampConfig) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateUsernameConfig: - opts, err := transformers.NewGenerateUsernameOptsFromConfig(config.GetGenerateUsernameConfig(), &maxLen) + opts, err := transformers.NewGenerateUsernameOptsFromConfig(cfg.GenerateUsernameConfig, &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateUtctimestampConfig: - opts, err := transformers.NewGenerateUTCTimestampOptsFromConfig(config.GetGenerateUtctimestampConfig()) + opts, err := transformers.NewGenerateUTCTimestampOptsFromConfig(cfg.GenerateUtctimestampConfig) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateUuidConfig: - opts, err := transformers.NewGenerateUUIDOptsFromConfig(config.GetGenerateUuidConfig()) + opts, err := transformers.NewGenerateUUIDOptsFromConfig(cfg.GenerateUuidConfig) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_GenerateZipcodeConfig: - opts, err := transformers.NewGenerateZipcodeOptsFromConfig(config.GetGenerateZipcodeConfig()) + opts, err := transformers.NewGenerateZipcodeOptsFromConfig(cfg.GenerateZipcodeConfig) if err != nil { return "", err } return opts.BuildBloblangString(), nil case *mgmtv1alpha1.TransformerConfig_TransformE164PhoneNumberConfig: - opts, err := transformers.NewTransformE164PhoneNumberOptsFromConfig(config.GetTransformE164PhoneNumberConfig(), &maxLen) + opts, err := transformers.NewTransformE164PhoneNumberOptsFromConfig(cfg.TransformE164PhoneNumberConfig, &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(formattedColPath), nil case *mgmtv1alpha1.TransformerConfig_TransformFirstNameConfig: - opts, err := transformers.NewTransformFirstNameOptsFromConfig(config.GetTransformFirstNameConfig(), &maxLen) + opts, err := transformers.NewTransformFirstNameOptsFromConfig(cfg.TransformFirstNameConfig, &maxLen) if err != nil { return "", err } @@ -647,44 +647,44 @@ func computeMutationFunction(col *mgmtv1alpha1.JobMapping, colInfo *sqlmanager_s newScale := int64(*colInfo.NumericScale) scale = &newScale } - opts, err := transformers.NewTransformFloat64OptsFromConfig(col.Transformer.Config.GetTransformFloat64Config(), scale, precision) + opts, err := transformers.NewTransformFloat64OptsFromConfig(cfg.TransformFloat64Config, scale, precision) if err != nil { return "", err } return opts.BuildBloblangString(formattedColPath), nil case *mgmtv1alpha1.TransformerConfig_TransformFullNameConfig: - opts, err := transformers.NewTransformFullNameOptsFromConfig(config.GetTransformFullNameConfig(), &maxLen) + opts, err := transformers.NewTransformFullNameOptsFromConfig(cfg.TransformFullNameConfig, &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(formattedColPath), nil case *mgmtv1alpha1.TransformerConfig_TransformInt64PhoneNumberConfig: - opts, err := transformers.NewTransformInt64PhoneNumberOptsFromConfig(config.GetTransformInt64PhoneNumberConfig()) + opts, err := transformers.NewTransformInt64PhoneNumberOptsFromConfig(cfg.TransformInt64PhoneNumberConfig) if err != nil { return "", err } return opts.BuildBloblangString(formattedColPath), nil case *mgmtv1alpha1.TransformerConfig_TransformInt64Config: - opts, err := transformers.NewTransformInt64OptsFromConfig(config.GetTransformInt64Config()) + opts, err := transformers.NewTransformInt64OptsFromConfig(cfg.TransformInt64Config) if err != nil { return "", err } return opts.BuildBloblangString(formattedColPath), nil case *mgmtv1alpha1.TransformerConfig_TransformLastNameConfig: - opts, err := transformers.NewTransformLastNameOptsFromConfig(config.GetTransformLastNameConfig(), &maxLen) + opts, err := transformers.NewTransformLastNameOptsFromConfig(cfg.TransformLastNameConfig, &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(formattedColPath), nil case *mgmtv1alpha1.TransformerConfig_TransformPhoneNumberConfig: - opts, err := transformers.NewTransformStringPhoneNumberOptsFromConfig(config.GetTransformPhoneNumberConfig(), &maxLen) + opts, err := transformers.NewTransformStringPhoneNumberOptsFromConfig(cfg.TransformPhoneNumberConfig, &maxLen) if err != nil { return "", err } return opts.BuildBloblangString(formattedColPath), nil case *mgmtv1alpha1.TransformerConfig_TransformStringConfig: minLength := int64(3) // todo: we need to pull in this value from the database schema - opts, err := transformers.NewTransformStringOptsFromConfig(config.GetTransformStringConfig(), &minLength, &maxLen) + opts, err := transformers.NewTransformStringOptsFromConfig(cfg.TransformStringConfig, &minLength, &maxLen) if err != nil { return "", err } @@ -694,18 +694,18 @@ func computeMutationFunction(col *mgmtv1alpha1.JobMapping, colInfo *sqlmanager_s case *mgmtv1alpha1.TransformerConfig_GenerateDefaultConfig: return `"DEFAULT"`, nil case *mgmtv1alpha1.TransformerConfig_TransformCharacterScrambleConfig: - opts, err := transformers.NewTransformCharacterScrambleOptsFromConfig(config.GetTransformCharacterScrambleConfig()) + opts, err := transformers.NewTransformCharacterScrambleOptsFromConfig(cfg.TransformCharacterScrambleConfig) if err != nil { return "", err } return opts.BuildBloblangString(formattedColPath), nil case *mgmtv1alpha1.TransformerConfig_GenerateCountryConfig: - opts, err := transformers.NewGenerateCountryOptsFromConfig(config.GetGenerateCountryConfig()) + opts, err := transformers.NewGenerateCountryOptsFromConfig(cfg.GenerateCountryConfig) if err != nil { return "", err } return opts.BuildBloblangString(), nil default: - return "", fmt.Errorf("unsupported transformer") + return "", fmt.Errorf("unsupported transformer: %T", cfg) } } From bb1bea62128314a766ba5ae9f6ea64c94432acf4 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Fri, 25 Oct 2024 15:52:58 -0700 Subject: [PATCH 5/6] fixes sqlc gen --- backend/gen/go/db/transformers.sql.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/gen/go/db/transformers.sql.go b/backend/gen/go/db/transformers.sql.go index 64d60de2ad..63b4dd707f 100644 --- a/backend/gen/go/db/transformers.sql.go +++ b/backend/gen/go/db/transformers.sql.go @@ -26,7 +26,7 @@ type CreateUserDefinedTransformerParams struct { Description string Source int32 AccountID pgtype.UUID - TransformerConfig *pg_models.TransformerConfigs + TransformerConfig *pg_models.TransformerConfig CreatedByID pgtype.UUID UpdatedByID pgtype.UUID } @@ -158,7 +158,7 @@ RETURNING id, created_at, updated_at, name, description, account_id, transformer type UpdateUserDefinedTransformerParams struct { Name string Description string - TransformerConfig *pg_models.TransformerConfigs + TransformerConfig *pg_models.TransformerConfig UpdatedByID pgtype.UUID ID pgtype.UUID } From 2d086f5d5d4f3f2ccf5913ff4815bf00f00d5810 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Fri, 25 Oct 2024 15:55:52 -0700 Subject: [PATCH 6/6] lint --- .../activities/gen-benthos-configs/benthos-builder_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder_test.go b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder_test.go index d1b6b34d4c..ec539102cc 100644 --- a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder_test.go +++ b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder_test.go @@ -372,7 +372,6 @@ func Test_buildProcessorConfigsMutation(t *testing.T) { require.Equal(t, *output[0].Mutation, "root.\"id\" = null\nroot.\"name\" = null") jsT := mgmtv1alpha1.SystemTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_EMAIL, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformEmailConfig{ TransformEmailConfig: &mgmtv1alpha1.TransformEmail{ @@ -438,7 +437,6 @@ func Test_buildProcessorConfigsJavascriptEmpty(t *testing.T) { ctx := context.Background() jsT := mgmtv1alpha1.SystemTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_JAVASCRIPT, Config: &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformJavascriptConfig{ TransformJavascriptConfig: &mgmtv1alpha1.TransformJavascript{