diff --git a/materialize-starburst/.snapshots/TestSQLGeneration b/materialize-starburst/.snapshots/TestSQLGeneration index 9528f8e071..d101f00dbb 100644 --- a/materialize-starburst/.snapshots/TestSQLGeneration +++ b/materialize-starburst/.snapshots/TestSQLGeneration @@ -20,8 +20,8 @@ CREATE TABLE IF NOT EXISTS "a-schema".target_table ( integer BIGINT, number DOUBLE, string VARCHAR, - datetime VARCHAR, - date VARCHAR, + datetime TIMESTAMP(6) WITH TIME ZONE, + date DATE, flow_document VARCHAR ) COMMENT 'Generated for materialization test/sqlite of collection key/value' @@ -40,10 +40,10 @@ MERGE INTO "a-schema".target_table AS l USING "a-schema".target_table_store_temp AS r ON l.key1 = r.key1 AND l.key2 = r.key2 WHEN MATCHED THEN - UPDATE SET boolean = r.boolean, integer = r.integer, number = r.number, string = r.string, datetime = r.datetime, date = r.date, flow_document = r.flow_document + UPDATE SET boolean = r.boolean, integer = r.integer, number = r.number, string = r.string, datetime = from_iso8601_timestamp_nanos(r.datetime), date = from_iso8601_date(r.date), flow_document = r.flow_document WHEN NOT MATCHED THEN INSERT (key1, key2, boolean, integer, number, string, datetime, date, flow_document) - VALUES (r.key1, r.key2, r.boolean, r.integer, r.number, r.string, r.datetime, r.date, r.flow_document) + VALUES (r.key1, r.key2, r.boolean, r.integer, r.number, r.string, from_iso8601_timestamp_nanos(r.datetime), from_iso8601_date(r.date), r.flow_document) --- End "a-schema".target_table mergeIntoTarget --- --- Begin "a-schema".target_table createLoadTempTable --- diff --git a/materialize-starburst/.snapshots/TestValidateAndApply b/materialize-starburst/.snapshots/TestValidateAndApply index 0c266fa2df..739ca453a3 100644 --- a/materialize-starburst/.snapshots/TestValidateAndApply +++ b/materialize-starburst/.snapshots/TestValidateAndApply @@ -88,8 +88,8 @@ Big Schema Changed Types Constraints: {"Field":"nullField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"} {"Field":"numField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'numField' is already being materialized as endpoint type 'DOUBLE' but endpoint type 'BOOLEAN' is required by its schema '{ type: [boolean] }'"} {"Field":"objField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"stringDateField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"stringDateTimeField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"stringDateField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringDateField' is already being materialized as endpoint type 'DATE' but endpoint type 'VARCHAR' is required by its schema '{ type: [string] }'"} +{"Field":"stringDateTimeField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringDateTimeField' is already being materialized as endpoint type 'TIMESTAMP(6) WITH TIME ZONE' but endpoint type 'VARCHAR' is required by its schema '{ type: [string] }'"} {"Field":"stringDurationField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringEmailField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringField' is already being materialized as endpoint type 'VARCHAR' but endpoint type 'BIGINT' is required by its schema '{ type: [integer] }'"} @@ -120,14 +120,14 @@ Big Schema Materialized Resource Schema With All Fields Required: {"Name":"arrayfield","Nullable":"YES","Type":"varchar"} {"Name":"boolfield","Nullable":"YES","Type":"boolean"} {"Name":"flow_document","Nullable":"YES","Type":"varchar"} -{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"} +{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"} {"Name":"intfield","Nullable":"YES","Type":"bigint"} {"Name":"key","Nullable":"YES","Type":"varchar"} {"Name":"multiplefield","Nullable":"YES","Type":"varchar"} {"Name":"numfield","Nullable":"YES","Type":"double"} {"Name":"objfield","Nullable":"YES","Type":"varchar"} -{"Name":"stringdatefield","Nullable":"YES","Type":"varchar"} -{"Name":"stringdatetimefield","Nullable":"YES","Type":"varchar"} +{"Name":"stringdatefield","Nullable":"YES","Type":"date"} +{"Name":"stringdatetimefield","Nullable":"YES","Type":"timestamp(6) with time zone"} {"Name":"stringdurationfield","Nullable":"YES","Type":"varchar"} {"Name":"stringemailfield","Nullable":"YES","Type":"varchar"} {"Name":"stringfield","Nullable":"YES","Type":"varchar"} @@ -158,14 +158,14 @@ Big Schema Materialized Resource Schema With No Fields Required: {"Name":"arrayfield","Nullable":"YES","Type":"varchar"} {"Name":"boolfield","Nullable":"YES","Type":"boolean"} {"Name":"flow_document","Nullable":"YES","Type":"varchar"} -{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"} +{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"} {"Name":"intfield","Nullable":"YES","Type":"bigint"} {"Name":"key","Nullable":"YES","Type":"varchar"} {"Name":"multiplefield","Nullable":"YES","Type":"varchar"} {"Name":"numfield","Nullable":"YES","Type":"double"} {"Name":"objfield","Nullable":"YES","Type":"varchar"} -{"Name":"stringdatefield","Nullable":"YES","Type":"varchar"} -{"Name":"stringdatetimefield","Nullable":"YES","Type":"varchar"} +{"Name":"stringdatefield","Nullable":"YES","Type":"date"} +{"Name":"stringdatetimefield","Nullable":"YES","Type":"timestamp(6) with time zone"} {"Name":"stringdurationfield","Nullable":"YES","Type":"varchar"} {"Name":"stringemailfield","Nullable":"YES","Type":"varchar"} {"Name":"stringfield","Nullable":"YES","Type":"varchar"} @@ -235,7 +235,7 @@ Big Schema Materialized Resource Schema Changed Types With Table Replacement: {"Name":"arrayfield","Nullable":"YES","Type":"varchar"} {"Name":"boolfield","Nullable":"YES","Type":"bigint"} {"Name":"flow_document","Nullable":"YES","Type":"varchar"} -{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"} +{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"} {"Name":"intfield","Nullable":"YES","Type":"varchar"} {"Name":"key","Nullable":"YES","Type":"varchar"} {"Name":"multiplefield","Nullable":"YES","Type":"varchar"} @@ -273,7 +273,7 @@ add a single field: {"Name":"_meta/flow_truncated","Nullable":"YES","Type":"boolean"} {"Name":"addedoptionalstring","Nullable":"YES","Type":"varchar"} {"Name":"flow_document","Nullable":"YES","Type":"varchar"} -{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"} +{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"} {"Name":"key","Nullable":"YES","Type":"varchar"} {"Name":"optionalboolean","Nullable":"YES","Type":"boolean"} {"Name":"optionalinteger","Nullable":"YES","Type":"bigint"} @@ -287,7 +287,7 @@ add a single field: remove a single optional field: {"Name":"_meta/flow_truncated","Nullable":"YES","Type":"boolean"} {"Name":"flow_document","Nullable":"YES","Type":"varchar"} -{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"} +{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"} {"Name":"key","Nullable":"YES","Type":"varchar"} {"Name":"optionalboolean","Nullable":"YES","Type":"boolean"} {"Name":"optionalinteger","Nullable":"YES","Type":"bigint"} @@ -301,7 +301,7 @@ remove a single optional field: remove a single required field: {"Name":"_meta/flow_truncated","Nullable":"YES","Type":"boolean"} {"Name":"flow_document","Nullable":"YES","Type":"varchar"} -{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"} +{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"} {"Name":"key","Nullable":"YES","Type":"varchar"} {"Name":"optionalboolean","Nullable":"YES","Type":"boolean"} {"Name":"optionalinteger","Nullable":"YES","Type":"bigint"} @@ -317,7 +317,7 @@ add and remove many fields: {"Name":"addedoptionalstring","Nullable":"YES","Type":"varchar"} {"Name":"addedrequiredstring","Nullable":"YES","Type":"varchar"} {"Name":"flow_document","Nullable":"YES","Type":"varchar"} -{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"} +{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"} {"Name":"key","Nullable":"YES","Type":"varchar"} {"Name":"optionalboolean","Nullable":"YES","Type":"boolean"} {"Name":"optionalinteger","Nullable":"YES","Type":"bigint"} @@ -337,7 +337,7 @@ Challenging Field Names Materialized Columns: {"Name":"_id","Nullable":"YES","Type":"varchar"} {"Name":"a\"string`with`quote'characters","Nullable":"YES","Type":"varchar"} {"Name":"flow_document","Nullable":"YES","Type":"varchar"} -{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"} +{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"} {"Name":"value with separated words","Nullable":"YES","Type":"varchar"} {"Name":"value-with-separated-words","Nullable":"YES","Type":"varchar"} {"Name":"value.with-separated_words","Nullable":"YES","Type":"varchar"} diff --git a/materialize-starburst/client.go b/materialize-starburst/client.go index ebd3ed2f91..97db23a5c8 100644 --- a/materialize-starburst/client.go +++ b/materialize-starburst/client.go @@ -39,7 +39,7 @@ func newClient(_ context.Context, ep *sql.Endpoint) (sql.Client, error) { return nil, err } - var templates = renderTemplates(starburstDialect) + var templates = renderTemplates(starburstTrinoDialect) return &client{ db: db, diff --git a/materialize-starburst/sqlgen.go b/materialize-starburst/sqlgen.go index def1047d92..6a33cb12b1 100644 --- a/materialize-starburst/sqlgen.go +++ b/materialize-starburst/sqlgen.go @@ -7,10 +7,12 @@ import ( "slices" "strings" "text/template" + "time" sql "github.com/estuary/connectors/materialize-sql" "github.com/estuary/flow/go/protocols/fdb/tuple" pf "github.com/estuary/flow/go/protocols/flow" + "github.com/trinodb/trino-go-client/trino" ) var simpleIdentifierRegexp = regexp.MustCompile(`^[_\pL]+[_\pL\pN]*$`) @@ -41,8 +43,28 @@ var doubleConverter sql.ElementConverter = func(te tuple.TupleElement) (interfac return fmt.Sprintf("%v", te), nil } -// starburstDialect returns a representation of the Starburst SQL dialect. -var starburstDialect = func() sql.Dialect { +var timestampConverter sql.ElementConverter = func(te tuple.TupleElement) (interface{}, error) { + parsed, err := time.Parse(time.RFC3339, te.(string)) + if err != nil { + return nil, err + } + timestamp := trino.Timestamp(parsed.Year(), parsed.Month(), parsed.Day(), parsed.Hour(), parsed.Minute(), parsed.Second(), parsed.Nanosecond()) + return timestamp, nil +} + +// starburstTrinoDialect returns a representation of the Starburst Trino SQL dialect used for target table. +var starburstTrinoDialect = func() sql.Dialect { + dateTimeMapper := sql.NewStaticMapper("TIMESTAMP(6) WITH TIME ZONE", sql.WithElementConverter(timestampConverter)) + return starburstDialect(sql.NewStaticMapper("DATE"), dateTimeMapper) +}() + +// starburstHiveDialect returns a representation of the Starburst Hive format SQL dialect used for temp table. +var starburstHiveDialect = func() sql.Dialect { + return starburstDialect(sql.NewStaticMapper("VARCHAR"), sql.NewStaticMapper("VARCHAR")) +}() + +var starburstDialect = func(dateMapper sql.TypeMapper, dateTimeMapper sql.TypeMapper) sql.Dialect { + var mapper sql.TypeMapper = sql.ProjectionTypeMapper{ sql.ARRAY: sql.NewStaticMapper("VARCHAR", sql.WithElementConverter(jsonConverter)), sql.BINARY: sql.NewStaticMapper("VARBINARY"), @@ -62,6 +84,8 @@ var starburstDialect = func() sql.Dialect { PrimaryKey: sql.NewStaticMapper("STRING"), Delegate: sql.NewStaticMapper("DOUBLE", sql.WithElementConverter(sql.StdStrToFloat("NaN", "inf", "-inf"))), }, + "date": dateMapper, + "date-time": dateTimeMapper, }, }, } @@ -71,6 +95,8 @@ var starburstDialect = func() sql.Dialect { sql.ColValidation{Types: []string{"boolean"}, Validate: sql.BooleanCompatible}, sql.ColValidation{Types: []string{"bigint"}, Validate: sql.IntegerCompatible}, sql.ColValidation{Types: []string{"double"}, Validate: sql.NumberCompatible}, + sql.ColValidation{Types: []string{"date"}, Validate: sql.DateCompatible}, + sql.ColValidation{Types: []string{"timestamp(6) with time zone"}, Validate: sql.DateTimeCompatible}, ) return sql.Dialect{ @@ -106,7 +132,7 @@ var starburstDialect = func() sql.Dialect { MaxColumnCharLength: 0, // Starburst has no limit on how long column names can be that I can find CaseInsensitiveColumns: true, } -}() +} // stringCompatible allow strings of any format, arrays, objects, or fields with multiple types to // be materialized, since these are all materialized as VARCHAR columns. @@ -225,7 +251,7 @@ MERGE INTO {{ $.Identifier }} AS l WHEN MATCHED THEN UPDATE SET {{ range $ind, $val := $.Values }} {{- if $ind }}, {{ end -}} - {{ $val.Identifier }} = r.{{ $val.Identifier }} + {{ $val.Identifier }} = {{ template "cast" $val }} {{- end -}} {{- if $.Document -}} {{ if $.Values }}, {{ end }}{{ $.Document.Identifier}} = r.{{ $.Document.Identifier }} @@ -234,13 +260,13 @@ MERGE INTO {{ $.Identifier }} AS l INSERT ( {{- range $ind, $col := $.Columns }} {{- if $ind }}, {{ end -}} - {{$col.Identifier -}} + {{$col.Identifier -}} {{- end -}} ) VALUES ( {{- range $ind, $col := $.Columns }} {{- if $ind }}, {{ end -}} - r.{{ $col.Identifier }} + {{ template "cast" $col -}} {{- end -}} ) {{ end }} @@ -251,6 +277,16 @@ DROP TABLE IF EXISTS {{$.Identifier}}_store_temp {{ define "alterTableColumns" }} ALTER TABLE {{.TableIdentifier}} ADD COLUMN {{.ColumnIdentifier}} {{.NullableDDL}} +{{ end }} + +{{ define "cast" }} +{{- if Contains $.DDL "DATE" -}} + from_iso8601_date(r.{{ $.Identifier}}) +{{- else if Contains $.DDL "TIMESTAMP" -}} + from_iso8601_timestamp_nanos(r.{{ $.Identifier}}) +{{- else -}} + r.{{ $.Identifier }} +{{- end -}} {{ end }} `) diff --git a/materialize-starburst/sqlgen_test.go b/materialize-starburst/sqlgen_test.go index 6f07886490..4d1891b8ed 100644 --- a/materialize-starburst/sqlgen_test.go +++ b/materialize-starburst/sqlgen_test.go @@ -14,8 +14,8 @@ import ( "github.com/stretchr/testify/require" ) -var targetTableDialect = starburstDialect -var tempTableDialect = starburstDialect +var targetTableDialect = starburstTrinoDialect +var tempTableDialect = starburstHiveDialect func TestSQLGeneration(t *testing.T) { var spec *pf.MaterializationSpec diff --git a/materialize-starburst/starburst.go b/materialize-starburst/starburst.go index e16b9a8174..47438533cc 100644 --- a/materialize-starburst/starburst.go +++ b/materialize-starburst/starburst.go @@ -123,11 +123,11 @@ func newStarburstDriver() *sql.Driver { var metaBase sql.TablePath var metaSpecs, _ = sql.MetaTables(metaBase) - var templates = renderTemplates(starburstDialect) + var templates = renderTemplates(starburstTrinoDialect) return &sql.Endpoint{ Config: cfg, - Dialect: starburstDialect, + Dialect: starburstTrinoDialect, MetaSpecs: &metaSpecs, NewClient: newClient, CreateTableTemplate: templates.createTargetTable, @@ -161,7 +161,7 @@ func newTransactor( open pm.Request_Open, ) (_ m.Transactor, err error) { var cfg = ep.Config.(*config) - var templates = renderTemplates(starburstDialect) + var templates = renderTemplates(starburstTrinoDialect) var transactor = &transactor{ cfg: cfg, @@ -216,20 +216,23 @@ func (t *transactor) addBinding(ctx context.Context, materializationSpec *pf.Mat var err error d.target = target - if d.load.createTempTable, err = sql.RenderTableTemplate(target, templates.createLoadTempTable); err != nil { + tempTable, _ := sql.ResolveTable(target.TableShape, starburstHiveDialect) + templatesTemp := renderTemplates(starburstHiveDialect) + + if d.load.createTempTable, err = sql.RenderTableTemplate(tempTable, templatesTemp.createLoadTempTable); err != nil { return fmt.Errorf("createLoadTempTable template: %w", err) } - if d.load.dropTempTable, err = sql.RenderTableTemplate(target, templates.dropLoadTempTable); err != nil { + if d.load.dropTempTable, err = sql.RenderTableTemplate(tempTable, templatesTemp.dropLoadTempTable); err != nil { return fmt.Errorf("dropLoadTempTable template: %w", err) } if d.load.loadQuery, err = sql.RenderTableTemplate(target, templates.loadQuery); err != nil { return fmt.Errorf("loadQuery template: %w", err) } - if d.store.createTempTable, err = sql.RenderTableTemplate(target, templates.createStoreTempTable); err != nil { + if d.store.createTempTable, err = sql.RenderTableTemplate(tempTable, templatesTemp.createStoreTempTable); err != nil { return fmt.Errorf("createStoreTempTable template: %w", err) } - if d.store.dropTempTable, err = sql.RenderTableTemplate(target, templates.dropStoreTempTable); err != nil { + if d.store.dropTempTable, err = sql.RenderTableTemplate(tempTable, templatesTemp.dropStoreTempTable); err != nil { return fmt.Errorf("dropStoreTempTable template: %w", err) } if d.store.mergeIntoTarget, err = sql.RenderTableTemplate(target, templates.mergeIntoTarget); err != nil {