Skip to content

Commit

Permalink
Merge branch 'main' into salesforce-source
Browse files Browse the repository at this point in the history
  • Loading branch information
Luishfs authored Mar 11, 2024
2 parents e039e79 + e8ed6f4 commit da6949c
Show file tree
Hide file tree
Showing 249 changed files with 4,726 additions and 4,439 deletions.
12 changes: 0 additions & 12 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,6 @@ jobs:
- connector: source-shopify
connector_type: capture
python: true
- connector: source-airtable
connector_type: capture
python: true
- connector: source-google-ads
connector_type: capture
python: true
- connector: source-facebook-marketing
connector_type: capture
python: true
- connector: source-hubspot
connector_type: capture
python: true

steps:
- uses: actions/checkout@v2
Expand Down
27 changes: 25 additions & 2 deletions .github/workflows/python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,28 @@ on:
branches: [main]
paths:
- "estuary-cdk/**"
- "source-airtable/**"
- "source-asana/**"
- "source-facebook-marketing/**"
- "source-gladly/**"
- "source-google-ads/**"
- "source-google-sheets-native/**"
- "source-hubspot-native/**"
- "source-salesforce/**"
- "source-hubspot/**"
pull_request:
branches: [main]
paths:
- "estuary-cdk/**"
- "source-airtable/**"
- "source-asana/**"
- "source-facebook-marketing/**"
- "source-gladly/**"
- "source-google-ads/**"
- "source-google-sheets-native/**"
- "source-hubspot-native/**"
- "source-salesforce/**"
- "source-hubspot/**"

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
Expand All @@ -34,25 +42,40 @@ jobs:
# Note that every entry here must specify usage_rate. We're unable to
# set a default and selectively override it with `0.0`, because GH actions
# considers zero values to be "unset" (which is bs and I'm salty about it).
- name: source-asana
- name: source-airtable
type: capture
version: v1
usage_rate: "1.0"
- name: source-hubspot-native
- name: source-asana
type: capture
version: v1
usage_rate: "1.0"
- name: source-facebook-marketing
type: capture
version: v4
usage_rate: "1.0"
- name: source-gladly
type: capture
version: v1
usage_rate: "1.0"
- name: source-google-ads
type: capture
version: v2
usage_rate: "1.0"
- name: source-google-sheets-native
type: capture
version: v1
usage_rate: "0.0"
- name: source-salesforce
type: capture
version: v2
- name: source-hubspot-native
type: capture
version: v1
usage_rate: "1.0"
- name: source-hubspot
type: capture
version: v5
usage_rate: "1.0"

steps:
Expand Down
2 changes: 1 addition & 1 deletion estuary-cdk/estuary_cdk/shim_airbyte_cdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async def _all_resources(
if stream.namespace:
resource_config.namespace = stream.namespace
if stream.default_cursor_field:
resource_config.cursor_field = [stream.default_cursor_field]
resource_config.cursor_field = stream.default_cursor_field

if stream.source_defined_primary_key:
# Map array of array of property names into an array of JSON pointers.
Expand Down
46 changes: 23 additions & 23 deletions materialize-databricks/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,6 @@ type transactor struct {

localStagingPath string

// Variables exclusively used by Load.
load struct {
conn *stdsql.Conn
}
// Variables exclusively used by Store.
store struct {
conn *stdsql.Conn
}
bindings []*binding

updateDelay time.Duration
Expand Down Expand Up @@ -195,17 +187,11 @@ func newTransactor(
}
}

// Establish connections.
if db, err := stdsql.Open("databricks", cfg.ToURI()); err != nil {
return nil, fmt.Errorf("load sql.Open: %w", err)
} else if d.load.conn, err = db.Conn(ctx); err != nil {
return nil, fmt.Errorf("load db.Conn: %w", err)
}
if db, err := stdsql.Open("databricks", cfg.ToURI()); err != nil {
return nil, fmt.Errorf("store sql.Open: %w", err)
} else if d.store.conn, err = db.Conn(ctx); err != nil {
return nil, fmt.Errorf("store db.Conn: %w", err)
db, err := stdsql.Open("databricks", d.cfg.ToURI())
if err != nil {
return nil, fmt.Errorf("sql.Open: %w", err)
}
defer db.Close()

for _, binding := range bindings {
if err = d.addBinding(ctx, binding, open.Range); err != nil {
Expand All @@ -214,7 +200,7 @@ func newTransactor(
}

// Create volume for storing staged files
if _, err := d.store.conn.ExecContext(ctx, fmt.Sprintf("CREATE VOLUME IF NOT EXISTS `%s`.`%s`;", cfg.SchemaName, volumeName)); err != nil {
if _, err := db.ExecContext(ctx, fmt.Sprintf("CREATE VOLUME IF NOT EXISTS `%s`.`%s`;", cfg.SchemaName, volumeName)); err != nil {
return nil, fmt.Errorf("Exec(CREATE VOLUME IF NOT EXISTS %s;): %w", volumeName, err)
}

Expand Down Expand Up @@ -320,10 +306,16 @@ func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage)
log.Info("load: starting join query")

if len(queries) > 0 {
db, err := stdsql.Open("databricks", d.cfg.ToURI())
if err != nil {
return fmt.Errorf("sql.Open: %w", err)
}
defer db.Close()

// Issue a union join of the target tables and their (now staged) load keys,
// and send results to the |loaded| callback.
var unionQuery = strings.Join(queries, "\nUNION ALL\n")
rows, err := d.load.conn.QueryContext(ctx, unionQuery)
rows, err := db.QueryContext(ctx, unionQuery)
if err != nil {
return fmt.Errorf("querying Load documents: %w", err)
}
Expand Down Expand Up @@ -454,14 +446,24 @@ func (d *transactor) Store(it *m.StoreIterator) (_ m.StartCommitFunc, err error)
func (d *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error) {
log.Info("store: starting committing changes")

var db *stdsql.DB
if len(d.cp) > 0 {
var err error
db, err = stdsql.Open("databricks", d.cfg.ToURI())
if err != nil {
return nil, fmt.Errorf("sql.Open: %w", err)
}
defer db.Close()
}

for stateKey, item := range d.cp {
// we skip queries that belong to tables which do not have a binding anymore
// since these tables might be deleted already
if !d.hasStateKey(stateKey) {
continue
}

if _, err := d.store.conn.ExecContext(ctx, item.Query); err != nil {
if _, err := db.ExecContext(ctx, item.Query); err != nil {
// When doing a recovery apply, it may be the case that some tables & files have already been deleted after being applied
// it is okay to skip them in this case
if d.cpRecovery {
Expand Down Expand Up @@ -517,8 +519,6 @@ func pathsWithRoot(root string, paths []string) []string {
}

func (d *transactor) Destroy() {
d.load.conn.Close()
d.store.conn.Close()
}

func main() {
Expand Down
46 changes: 26 additions & 20 deletions materialize-starburst/.snapshots/TestSQLGeneration
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ SELECT
boolean,
integer,
number,
string
string,
datetime,
date
FROM "a-schema".target_table
WHERE
key1 = ?
Expand All @@ -18,11 +20,32 @@ CREATE TABLE IF NOT EXISTS "a-schema".target_table (
integer BIGINT,
number DOUBLE,
string VARCHAR,
datetime TIMESTAMP(6) WITH TIME ZONE,
date DATE,
flow_document VARCHAR
)
COMMENT 'Generated for materialization test/sqlite of collection key/value'
--- End "a-schema".target_table createTargetTable ---

--- Begin "a-schema".target_table loadQuery ---
SELECT 0, flow_document
FROM "a-schema".target_table AS l
JOIN "a-schema".target_table_load_temp AS r
ON l.key1 = r.key1 AND l.key2 = r.key2

--- End "a-schema".target_table loadQuery ---

--- Begin "a-schema".target_table mergeIntoTarget ---
MERGE INTO "a-schema".target_table AS l
USING "a-schema".target_table_store_temp AS r
ON l.key1 = r.key1 AND l.key2 = r.key2
WHEN MATCHED THEN
UPDATE SET boolean = r.boolean, integer = r.integer, number = r.number, string = r.string, datetime = from_iso8601_timestamp_nanos(r.datetime), date = from_iso8601_date(r.date), flow_document = r.flow_document
WHEN NOT MATCHED THEN
INSERT (key1, key2, boolean, integer, number, string, datetime, date, flow_document)
VALUES (r.key1, r.key2, r.boolean, r.integer, r.number, r.string, from_iso8601_timestamp_nanos(r.datetime), from_iso8601_date(r.date), r.flow_document)
--- End "a-schema".target_table mergeIntoTarget ---

--- Begin "a-schema".target_table createLoadTempTable ---
CREATE TABLE "a-schema".target_table_load_temp (
key1 BIGINT,
Expand All @@ -39,14 +62,6 @@ WITH (
DROP TABLE IF EXISTS "a-schema".target_table_load_temp
--- End "a-schema".target_table dropLoadTempTable ---

--- Begin "a-schema".target_table loadQuery ---
SELECT 0, flow_document
FROM "a-schema".target_table AS l
JOIN "a-schema".target_table_load_temp AS r
ON l.key1 = r.key1 AND l.key2 = r.key2

--- End "a-schema".target_table loadQuery ---

--- Begin "a-schema".target_table createStoreTempTable ---
CREATE TABLE "a-schema".target_table_store_temp (
key1 BIGINT,
Expand All @@ -55,6 +70,8 @@ CREATE TABLE "a-schema".target_table_store_temp (
integer BIGINT,
number DOUBLE,
string VARCHAR,
datetime VARCHAR,
date VARCHAR,
flow_document VARCHAR
)
WITH (
Expand All @@ -68,17 +85,6 @@ WITH (
DROP TABLE IF EXISTS "a-schema".target_table_store_temp
--- End "a-schema".target_table dropStoreTempTable ---

--- Begin "a-schema".target_table mergeIntoTarget ---
MERGE INTO "a-schema".target_table AS l
USING "a-schema".target_table_store_temp AS r
ON l.key1 = r.key1 AND l.key2 = r.key2
WHEN MATCHED THEN
UPDATE SET boolean = r.boolean, integer = r.integer, number = r.number, string = r.string, flow_document = r.flow_document
WHEN NOT MATCHED THEN
INSERT (key1, key2, boolean, integer, number, string, flow_document)
VALUES (r.key1, r.key2, r.boolean, r.integer, r.number, r.string, r.flow_document)
--- End "a-schema".target_table mergeIntoTarget ---

--- Begin alter table add columns ---
ALTER TABLE "a-schema".target_table ADD COLUMN first_new_column STRING
--- End alter table add columns ---
Expand Down
28 changes: 14 additions & 14 deletions materialize-starburst/.snapshots/TestValidateAndApply
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ Big Schema Changed Types Constraints:
{"Field":"nullField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"}
{"Field":"numField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'numField' is already being materialized as endpoint type 'DOUBLE' but endpoint type 'BOOLEAN' is required by its schema '{ type: [boolean] }'"}
{"Field":"objField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringDateField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringDateTimeField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringDateField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringDateField' is already being materialized as endpoint type 'DATE' but endpoint type 'VARCHAR' is required by its schema '{ type: [string] }'"}
{"Field":"stringDateTimeField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringDateTimeField' is already being materialized as endpoint type 'TIMESTAMP(6) WITH TIME ZONE' but endpoint type 'VARCHAR' is required by its schema '{ type: [string] }'"}
{"Field":"stringDurationField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringEmailField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringField' is already being materialized as endpoint type 'VARCHAR' but endpoint type 'BIGINT' is required by its schema '{ type: [integer] }'"}
Expand Down Expand Up @@ -120,14 +120,14 @@ Big Schema Materialized Resource Schema With All Fields Required:
{"Name":"arrayfield","Nullable":"YES","Type":"varchar"}
{"Name":"boolfield","Nullable":"YES","Type":"boolean"}
{"Name":"flow_document","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"intfield","Nullable":"YES","Type":"bigint"}
{"Name":"key","Nullable":"YES","Type":"varchar"}
{"Name":"multiplefield","Nullable":"YES","Type":"varchar"}
{"Name":"numfield","Nullable":"YES","Type":"double"}
{"Name":"objfield","Nullable":"YES","Type":"varchar"}
{"Name":"stringdatefield","Nullable":"YES","Type":"varchar"}
{"Name":"stringdatetimefield","Nullable":"YES","Type":"varchar"}
{"Name":"stringdatefield","Nullable":"YES","Type":"date"}
{"Name":"stringdatetimefield","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"stringdurationfield","Nullable":"YES","Type":"varchar"}
{"Name":"stringemailfield","Nullable":"YES","Type":"varchar"}
{"Name":"stringfield","Nullable":"YES","Type":"varchar"}
Expand Down Expand Up @@ -158,14 +158,14 @@ Big Schema Materialized Resource Schema With No Fields Required:
{"Name":"arrayfield","Nullable":"YES","Type":"varchar"}
{"Name":"boolfield","Nullable":"YES","Type":"boolean"}
{"Name":"flow_document","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"intfield","Nullable":"YES","Type":"bigint"}
{"Name":"key","Nullable":"YES","Type":"varchar"}
{"Name":"multiplefield","Nullable":"YES","Type":"varchar"}
{"Name":"numfield","Nullable":"YES","Type":"double"}
{"Name":"objfield","Nullable":"YES","Type":"varchar"}
{"Name":"stringdatefield","Nullable":"YES","Type":"varchar"}
{"Name":"stringdatetimefield","Nullable":"YES","Type":"varchar"}
{"Name":"stringdatefield","Nullable":"YES","Type":"date"}
{"Name":"stringdatetimefield","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"stringdurationfield","Nullable":"YES","Type":"varchar"}
{"Name":"stringemailfield","Nullable":"YES","Type":"varchar"}
{"Name":"stringfield","Nullable":"YES","Type":"varchar"}
Expand Down Expand Up @@ -235,7 +235,7 @@ Big Schema Materialized Resource Schema Changed Types With Table Replacement:
{"Name":"arrayfield","Nullable":"YES","Type":"varchar"}
{"Name":"boolfield","Nullable":"YES","Type":"bigint"}
{"Name":"flow_document","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"intfield","Nullable":"YES","Type":"varchar"}
{"Name":"key","Nullable":"YES","Type":"varchar"}
{"Name":"multiplefield","Nullable":"YES","Type":"varchar"}
Expand Down Expand Up @@ -273,7 +273,7 @@ add a single field:
{"Name":"_meta/flow_truncated","Nullable":"YES","Type":"boolean"}
{"Name":"addedoptionalstring","Nullable":"YES","Type":"varchar"}
{"Name":"flow_document","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"key","Nullable":"YES","Type":"varchar"}
{"Name":"optionalboolean","Nullable":"YES","Type":"boolean"}
{"Name":"optionalinteger","Nullable":"YES","Type":"bigint"}
Expand All @@ -287,7 +287,7 @@ add a single field:
remove a single optional field:
{"Name":"_meta/flow_truncated","Nullable":"YES","Type":"boolean"}
{"Name":"flow_document","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"key","Nullable":"YES","Type":"varchar"}
{"Name":"optionalboolean","Nullable":"YES","Type":"boolean"}
{"Name":"optionalinteger","Nullable":"YES","Type":"bigint"}
Expand All @@ -301,7 +301,7 @@ remove a single optional field:
remove a single required field:
{"Name":"_meta/flow_truncated","Nullable":"YES","Type":"boolean"}
{"Name":"flow_document","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"key","Nullable":"YES","Type":"varchar"}
{"Name":"optionalboolean","Nullable":"YES","Type":"boolean"}
{"Name":"optionalinteger","Nullable":"YES","Type":"bigint"}
Expand All @@ -317,7 +317,7 @@ add and remove many fields:
{"Name":"addedoptionalstring","Nullable":"YES","Type":"varchar"}
{"Name":"addedrequiredstring","Nullable":"YES","Type":"varchar"}
{"Name":"flow_document","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"key","Nullable":"YES","Type":"varchar"}
{"Name":"optionalboolean","Nullable":"YES","Type":"boolean"}
{"Name":"optionalinteger","Nullable":"YES","Type":"bigint"}
Expand All @@ -337,7 +337,7 @@ Challenging Field Names Materialized Columns:
{"Name":"_id","Nullable":"YES","Type":"varchar"}
{"Name":"a\"string`with`quote'characters","Nullable":"YES","Type":"varchar"}
{"Name":"flow_document","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"varchar"}
{"Name":"flow_published_at","Nullable":"YES","Type":"timestamp(6) with time zone"}
{"Name":"value with separated words","Nullable":"YES","Type":"varchar"}
{"Name":"value-with-separated-words","Nullable":"YES","Type":"varchar"}
{"Name":"value.with-separated_words","Nullable":"YES","Type":"varchar"}
Expand Down
2 changes: 1 addition & 1 deletion materialize-starburst/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func newClient(_ context.Context, ep *sql.Endpoint) (sql.Client, error) {
return nil, err
}

var templates = renderTemplates(starburstDialect)
var templates = renderTemplates(starburstTrinoDialect)

return &client{
db: db,
Expand Down
Loading

0 comments on commit da6949c

Please sign in to comment.