Skip to content

Commit

Permalink
Add date and date-time support for starburst-materialization
Browse files Browse the repository at this point in the history
  • Loading branch information
pajaks authored and williamhbaker committed Mar 11, 2024
1 parent 4c0a168 commit 799c8a2
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 34 deletions.
8 changes: 4 additions & 4 deletions materialize-starburst/.snapshots/TestSQLGeneration
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ CREATE TABLE IF NOT EXISTS "a-schema".target_table (
integer BIGINT,
number DOUBLE,
string VARCHAR,
datetime VARCHAR,
date VARCHAR,
datetime TIMESTAMP(6) WITH TIME ZONE,
date DATE,
flow_document VARCHAR
)
COMMENT 'Generated for materialization test/sqlite of collection key/value'
Expand All @@ -40,10 +40,10 @@ MERGE INTO "a-schema".target_table AS l
USING "a-schema".target_table_store_temp AS r
ON l.key1 = r.key1 AND l.key2 = r.key2
WHEN MATCHED THEN
UPDATE SET boolean = r.boolean, integer = r.integer, number = r.number, string = r.string, datetime = r.datetime, date = r.date, flow_document = r.flow_document
UPDATE SET boolean = r.boolean, integer = r.integer, number = r.number, string = r.string, datetime = from_iso8601_timestamp_nanos(r.datetime), date = from_iso8601_date(r.date), flow_document = r.flow_document
WHEN NOT MATCHED THEN
INSERT (key1, key2, boolean, integer, number, string, datetime, date, flow_document)
VALUES (r.key1, r.key2, r.boolean, r.integer, r.number, r.string, r.datetime, r.date, r.flow_document)
VALUES (r.key1, r.key2, r.boolean, r.integer, r.number, r.string, from_iso8601_timestamp_nanos(r.datetime), from_iso8601_date(r.date), r.flow_document)
--- End "a-schema".target_table mergeIntoTarget ---

--- Begin "a-schema".target_table createLoadTempTable ---
Expand Down
28 changes: 14 additions & 14 deletions materialize-starburst/.snapshots/TestValidateAndApply
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ Big Schema Changed Types Constraints:
{"Field":"nullField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"}
{"Field":"numField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'numField' is already being materialized as endpoint type 'DOUBLE' but endpoint type 'BOOLEAN' is required by its schema '{ type: [boolean] }'"}
{"Field":"objField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringDateField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringDateTimeField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringDateField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringDateField' is already being materialized as endpoint type 'DATE' but endpoint type 'VARCHAR' is required by its schema '{ type: [string] }'"}
{"Field":"stringDateTimeField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringDateTimeField' is already being materialized as endpoint type 'TIMESTAMP(6) WITH TIME ZONE' but endpoint type 'VARCHAR' is required by its schema '{ type: [string] }'"}
{"Field":"stringDurationField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringEmailField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringField' is already being materialized as endpoint type 'VARCHAR' but endpoint type 'BIGINT' is required by its schema '{ type: [integer] }'"}
Expand Down Expand Up @@ -120,14 +120,14 @@ Big Schema Materialized Resource Schema With All Fields Required:
{"Name":"arrayfield","Nullable":"YES","Type":"varchar"}
{"Name":"boolfield","Nullable":"YES","Type":"boolean"}
{"Name":"flow_document","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"intfield","Nullable":"YES","Type":"bigint"}
{"Name":"key","Nullable":"YES","Type":"varchar"}
{"Name":"multiplefield","Nullable":"YES","Type":"varchar"}
{"Name":"numfield","Nullable":"YES","Type":"double"}
{"Name":"objfield","Nullable":"YES","Type":"varchar"}
{"Name":"stringdatefield","Nullable":"YES","Type":"varchar"}
{"Name":"stringdatetimefield","Nullable":"YES","Type":"varchar"}
{"Name":"stringdatefield","Nullable":"YES","Type":"date"}
{"Name":"stringdatetimefield","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"stringdurationfield","Nullable":"YES","Type":"varchar"}
{"Name":"stringemailfield","Nullable":"YES","Type":"varchar"}
{"Name":"stringfield","Nullable":"YES","Type":"varchar"}
Expand Down Expand Up @@ -158,14 +158,14 @@ Big Schema Materialized Resource Schema With No Fields Required:
{"Name":"arrayfield","Nullable":"YES","Type":"varchar"}
{"Name":"boolfield","Nullable":"YES","Type":"boolean"}
{"Name":"flow_document","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"intfield","Nullable":"YES","Type":"bigint"}
{"Name":"key","Nullable":"YES","Type":"varchar"}
{"Name":"multiplefield","Nullable":"YES","Type":"varchar"}
{"Name":"numfield","Nullable":"YES","Type":"double"}
{"Name":"objfield","Nullable":"YES","Type":"varchar"}
{"Name":"stringdatefield","Nullable":"YES","Type":"varchar"}
{"Name":"stringdatetimefield","Nullable":"YES","Type":"varchar"}
{"Name":"stringdatefield","Nullable":"YES","Type":"date"}
{"Name":"stringdatetimefield","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"stringdurationfield","Nullable":"YES","Type":"varchar"}
{"Name":"stringemailfield","Nullable":"YES","Type":"varchar"}
{"Name":"stringfield","Nullable":"YES","Type":"varchar"}
Expand Down Expand Up @@ -235,7 +235,7 @@ Big Schema Materialized Resource Schema Changed Types With Table Replacement:
{"Name":"arrayfield","Nullable":"YES","Type":"varchar"}
{"Name":"boolfield","Nullable":"YES","Type":"bigint"}
{"Name":"flow_document","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"intfield","Nullable":"YES","Type":"varchar"}
{"Name":"key","Nullable":"YES","Type":"varchar"}
{"Name":"multiplefield","Nullable":"YES","Type":"varchar"}
Expand Down Expand Up @@ -273,7 +273,7 @@ add a single field:
{"Name":"_meta/flow_truncated","Nullable":"YES","Type":"boolean"}
{"Name":"addedoptionalstring","Nullable":"YES","Type":"varchar"}
{"Name":"flow_document","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"key","Nullable":"YES","Type":"varchar"}
{"Name":"optionalboolean","Nullable":"YES","Type":"boolean"}
{"Name":"optionalinteger","Nullable":"YES","Type":"bigint"}
Expand All @@ -287,7 +287,7 @@ add a single field:
remove a single optional field:
{"Name":"_meta/flow_truncated","Nullable":"YES","Type":"boolean"}
{"Name":"flow_document","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"key","Nullable":"YES","Type":"varchar"}
{"Name":"optionalboolean","Nullable":"YES","Type":"boolean"}
{"Name":"optionalinteger","Nullable":"YES","Type":"bigint"}
Expand All @@ -301,7 +301,7 @@ remove a single optional field:
remove a single required field:
{"Name":"_meta/flow_truncated","Nullable":"YES","Type":"boolean"}
{"Name":"flow_document","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"key","Nullable":"YES","Type":"varchar"}
{"Name":"optionalboolean","Nullable":"YES","Type":"boolean"}
{"Name":"optionalinteger","Nullable":"YES","Type":"bigint"}
Expand All @@ -317,7 +317,7 @@ add and remove many fields:
{"Name":"addedoptionalstring","Nullable":"YES","Type":"varchar"}
{"Name":"addedrequiredstring","Nullable":"YES","Type":"varchar"}
{"Name":"flow_document","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"key","Nullable":"YES","Type":"varchar"}
{"Name":"optionalboolean","Nullable":"YES","Type":"boolean"}
{"Name":"optionalinteger","Nullable":"YES","Type":"bigint"}
Expand All @@ -337,7 +337,7 @@ Challenging Field Names Materialized Columns:
{"Name":"_id","Nullable":"YES","Type":"varchar"}
{"Name":"a\"string`with`quote'characters","Nullable":"YES","Type":"varchar"}
{"Name":"flow_document","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"value with separated words","Nullable":"YES","Type":"varchar"}
{"Name":"value-with-separated-words","Nullable":"YES","Type":"varchar"}
{"Name":"value.with-separated_words","Nullable":"YES","Type":"varchar"}
Expand Down
2 changes: 1 addition & 1 deletion materialize-starburst/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func newClient(_ context.Context, ep *sql.Endpoint) (sql.Client, error) {
return nil, err
}

var templates = renderTemplates(starburstDialect)
var templates = renderTemplates(starburstTrinoDialect)

return &client{
db: db,
Expand Down
48 changes: 42 additions & 6 deletions materialize-starburst/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"slices"
"strings"
"text/template"
"time"

sql "github.com/estuary/connectors/materialize-sql"
"github.com/estuary/flow/go/protocols/fdb/tuple"
pf "github.com/estuary/flow/go/protocols/flow"
"github.com/trinodb/trino-go-client/trino"
)

var simpleIdentifierRegexp = regexp.MustCompile(`^[_\pL]+[_\pL\pN]*$`)
Expand Down Expand Up @@ -41,8 +43,28 @@ var doubleConverter sql.ElementConverter = func(te tuple.TupleElement) (interfac
return fmt.Sprintf("%v", te), nil
}

// starburstDialect returns a representation of the Starburst SQL dialect.
var starburstDialect = func() sql.Dialect {
var timestampConverter sql.ElementConverter = func(te tuple.TupleElement) (interface{}, error) {
parsed, err := time.Parse(time.RFC3339, te.(string))
if err != nil {
return nil, err
}
timestamp := trino.Timestamp(parsed.Year(), parsed.Month(), parsed.Day(), parsed.Hour(), parsed.Minute(), parsed.Second(), parsed.Nanosecond())
return timestamp, nil
}

// starburstTrinoDialect returns a representation of the Starburst Trino SQL dialect used for target table.
var starburstTrinoDialect = func() sql.Dialect {
dateTimeMapper := sql.NewStaticMapper("TIMESTAMP(6) WITH TIME ZONE", sql.WithElementConverter(timestampConverter))
return starburstDialect(sql.NewStaticMapper("DATE"), dateTimeMapper)
}()

// starburstHiveDialect returns a representation of the Starburst Hive format SQL dialect used for temp table.
var starburstHiveDialect = func() sql.Dialect {
return starburstDialect(sql.NewStaticMapper("VARCHAR"), sql.NewStaticMapper("VARCHAR"))
}()

var starburstDialect = func(dateMapper sql.TypeMapper, dateTimeMapper sql.TypeMapper) sql.Dialect {

var mapper sql.TypeMapper = sql.ProjectionTypeMapper{
sql.ARRAY: sql.NewStaticMapper("VARCHAR", sql.WithElementConverter(jsonConverter)),
sql.BINARY: sql.NewStaticMapper("VARBINARY"),
Expand All @@ -62,6 +84,8 @@ var starburstDialect = func() sql.Dialect {
PrimaryKey: sql.NewStaticMapper("STRING"),
Delegate: sql.NewStaticMapper("DOUBLE", sql.WithElementConverter(sql.StdStrToFloat("NaN", "inf", "-inf"))),
},
"date": dateMapper,
"date-time": dateTimeMapper,
},
},
}
Expand All @@ -71,6 +95,8 @@ var starburstDialect = func() sql.Dialect {
sql.ColValidation{Types: []string{"boolean"}, Validate: sql.BooleanCompatible},
sql.ColValidation{Types: []string{"bigint"}, Validate: sql.IntegerCompatible},
sql.ColValidation{Types: []string{"double"}, Validate: sql.NumberCompatible},
sql.ColValidation{Types: []string{"date"}, Validate: sql.DateCompatible},
sql.ColValidation{Types: []string{"timestamp(6) with time zone"}, Validate: sql.DateTimeCompatible},
)

return sql.Dialect{
Expand Down Expand Up @@ -106,7 +132,7 @@ var starburstDialect = func() sql.Dialect {
MaxColumnCharLength: 0, // Starburst has no limit on how long column names can be that I can find
CaseInsensitiveColumns: true,
}
}()
}

// stringCompatible allow strings of any format, arrays, objects, or fields with multiple types to
// be materialized, since these are all materialized as VARCHAR columns.
Expand Down Expand Up @@ -225,7 +251,7 @@ MERGE INTO {{ $.Identifier }} AS l
WHEN MATCHED THEN
UPDATE SET {{ range $ind, $val := $.Values }}
{{- if $ind }}, {{ end -}}
{{ $val.Identifier }} = r.{{ $val.Identifier }}
{{ $val.Identifier }} = {{ template "cast" $val }}
{{- end -}}
{{- if $.Document -}}
{{ if $.Values }}, {{ end }}{{ $.Document.Identifier}} = r.{{ $.Document.Identifier }}
Expand All @@ -234,13 +260,13 @@ MERGE INTO {{ $.Identifier }} AS l
INSERT (
{{- range $ind, $col := $.Columns }}
{{- if $ind }}, {{ end -}}
{{$col.Identifier -}}
{{$col.Identifier -}}
{{- end -}}
)
VALUES (
{{- range $ind, $col := $.Columns }}
{{- if $ind }}, {{ end -}}
r.{{ $col.Identifier }}
{{ template "cast" $col -}}
{{- end -}}
)
{{ end }}
Expand All @@ -251,6 +277,16 @@ DROP TABLE IF EXISTS {{$.Identifier}}_store_temp
{{ define "alterTableColumns" }}
ALTER TABLE {{.TableIdentifier}} ADD COLUMN {{.ColumnIdentifier}} {{.NullableDDL}}
{{ end }}
{{ define "cast" }}
{{- if Contains $.DDL "DATE" -}}
from_iso8601_date(r.{{ $.Identifier}})
{{- else if Contains $.DDL "TIMESTAMP" -}}
from_iso8601_timestamp_nanos(r.{{ $.Identifier}})
{{- else -}}
r.{{ $.Identifier }}
{{- end -}}
{{ end }}
`)
Expand Down
4 changes: 2 additions & 2 deletions materialize-starburst/sqlgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/stretchr/testify/require"
)

var targetTableDialect = starburstDialect
var tempTableDialect = starburstDialect
var targetTableDialect = starburstTrinoDialect
var tempTableDialect = starburstHiveDialect

func TestSQLGeneration(t *testing.T) {
var spec *pf.MaterializationSpec
Expand Down
17 changes: 10 additions & 7 deletions materialize-starburst/starburst.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ func newStarburstDriver() *sql.Driver {

var metaBase sql.TablePath
var metaSpecs, _ = sql.MetaTables(metaBase)
var templates = renderTemplates(starburstDialect)
var templates = renderTemplates(starburstTrinoDialect)

return &sql.Endpoint{
Config: cfg,
Dialect: starburstDialect,
Dialect: starburstTrinoDialect,
MetaSpecs: &metaSpecs,
NewClient: newClient,
CreateTableTemplate: templates.createTargetTable,
Expand Down Expand Up @@ -161,7 +161,7 @@ func newTransactor(
open pm.Request_Open,
) (_ m.Transactor, err error) {
var cfg = ep.Config.(*config)
var templates = renderTemplates(starburstDialect)
var templates = renderTemplates(starburstTrinoDialect)

var transactor = &transactor{
cfg: cfg,
Expand Down Expand Up @@ -216,20 +216,23 @@ func (t *transactor) addBinding(ctx context.Context, materializationSpec *pf.Mat
var err error
d.target = target

if d.load.createTempTable, err = sql.RenderTableTemplate(target, templates.createLoadTempTable); err != nil {
tempTable, _ := sql.ResolveTable(target.TableShape, starburstHiveDialect)
templatesTemp := renderTemplates(starburstHiveDialect)

if d.load.createTempTable, err = sql.RenderTableTemplate(tempTable, templatesTemp.createLoadTempTable); err != nil {
return fmt.Errorf("createLoadTempTable template: %w", err)
}
if d.load.dropTempTable, err = sql.RenderTableTemplate(target, templates.dropLoadTempTable); err != nil {
if d.load.dropTempTable, err = sql.RenderTableTemplate(tempTable, templatesTemp.dropLoadTempTable); err != nil {
return fmt.Errorf("dropLoadTempTable template: %w", err)
}
if d.load.loadQuery, err = sql.RenderTableTemplate(target, templates.loadQuery); err != nil {
return fmt.Errorf("loadQuery template: %w", err)
}

if d.store.createTempTable, err = sql.RenderTableTemplate(target, templates.createStoreTempTable); err != nil {
if d.store.createTempTable, err = sql.RenderTableTemplate(tempTable, templatesTemp.createStoreTempTable); err != nil {
return fmt.Errorf("createStoreTempTable template: %w", err)
}
if d.store.dropTempTable, err = sql.RenderTableTemplate(target, templates.dropStoreTempTable); err != nil {
if d.store.dropTempTable, err = sql.RenderTableTemplate(tempTable, templatesTemp.dropStoreTempTable); err != nil {
return fmt.Errorf("dropStoreTempTable template: %w", err)
}
if d.store.mergeIntoTarget, err = sql.RenderTableTemplate(target, templates.mergeIntoTarget); err != nil {
Expand Down

0 comments on commit 799c8a2

Please sign in to comment.