Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jul 17, 2024
2 parents f499201 + 3caa312 commit 2dbbefa
Show file tree
Hide file tree
Showing 13 changed files with 142 additions and 142 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/stable-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Stable Docker images
on:
push:
tags:
- "v[0-9]+.[0-9]+.[0-9]+"
- "v[0-9]+.[0-9]+.[0-9]+[a-zA-Z]*"

jobs:
docker-build:
Expand Down
83 changes: 45 additions & 38 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
})
defer shutdown()

dstConn, err := connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

tblNameMapping := make(map[string]model.NameAndExclude, len(options.TableMappings))
for _, v := range options.TableMappings {
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
Expand All @@ -116,11 +110,19 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
batchSize = 1_000_000
}

lastOffset, err := dstConn.GetLastOffset(ctx, config.FlowJobName)
lastOffset, err := func() (int64, error) {
dstConn, err := connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
return 0, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

return dstConn.GetLastOffset(ctx, config.FlowJobName)
}()
if err != nil {
return nil, err
}
connectors.CloseConnector(ctx, dstConn)

logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset))
consumedOffset := atomic.Int64{}
consumedOffset.Store(lastOffset)
Expand Down Expand Up @@ -161,11 +163,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
hasRecords := !recordBatchSync.WaitAndCheckEmpty()
logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords))

dstConn, err = connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
return nil, fmt.Errorf("failed to recreate destination connector: %w", err)
}

if !hasRecords {
// wait for the pull goroutine to finish
if err := errGroup.Wait(); err != nil {
Expand All @@ -178,8 +175,13 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}
logger.Info("no records to push")

err := dstConn.ReplayTableSchemaDeltas(ctx, flowName, recordBatchSync.SchemaDeltas)
dstConn, err := connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
return nil, fmt.Errorf("failed to recreate destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

if err := dstConn.ReplayTableSchemaDeltas(ctx, flowName, recordBatchSync.SchemaDeltas); err != nil {
return nil, fmt.Errorf("failed to sync schema: %w", err)
}

Expand All @@ -195,6 +197,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
var syncStartTime time.Time
var res *model.SyncResponse
errGroup.Go(func() error {
dstConn, err := connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
return fmt.Errorf("failed to recreate destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName)
if err != nil {
return err
Expand Down Expand Up @@ -344,13 +352,6 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName))

srcConn, err := connectors.GetByNameAs[TPull](ctx, a.CatalogPool, config.SourceName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

dstConn, err := connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down Expand Up @@ -383,6 +384,13 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
var rowsSynced int
errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
srcConn, err := connectors.GetByNameAs[TPull](ctx, a.CatalogPool, config.SourceName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

tmp, err := pullRecords(srcConn, errCtx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down Expand Up @@ -443,17 +451,6 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
logger := activity.GetLogger(ctx)

startTime := time.Now()
srcConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, a.CatalogPool, config.SourceName)
if err != nil {
return 0, fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

dstConn, err := connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
return 0, fmt.Errorf("failed to get qrep destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

logger.Info("replicating xmin")
shutdown := heartbeatRoutine(ctx, func() string {
Expand All @@ -466,6 +463,12 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
var currentSnapshotXmin int64
var rowsSynced int
errGroup.Go(func() error {
srcConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, a.CatalogPool, config.SourceName)
if err != nil {
return fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

var pullErr error
var numRecords int
numRecords, currentSnapshotXmin, pullErr = pullRecords(srcConn, ctx, config, partition, stream)
Expand Down Expand Up @@ -495,14 +498,13 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
return updateErr
}

err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, startTime)
if err != nil {
if err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, startTime); err != nil {
return fmt.Errorf("failed to update start time for partition: %w", err)
}

err = monitoring.UpdatePullEndTimeAndRowsForPartition(
errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
if err != nil {
if err := monitoring.UpdatePullEndTimeAndRowsForPartition(
errCtx, a.CatalogPool, runUUID, partition, int64(numRecords),
); err != nil {
logger.Error(err.Error())
return err
}
Expand All @@ -511,7 +513,12 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
})

errGroup.Go(func() error {
var err error
dstConn, err := connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
return fmt.Errorf("failed to get qrep destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

rowsSynced, err = syncRecords(dstConn, ctx, config, partition, outstream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down
43 changes: 14 additions & 29 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log/slog"
"strings"
"time"

_ "github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
Expand All @@ -30,7 +31,7 @@ func (c *ClickhouseConnector) getRawTableName(flowJobName string) string {

func (c *ClickhouseConnector) checkIfTableExists(ctx context.Context, databaseName string, tableIdentifier string) (bool, error) {
var result sql.NullInt32
err := c.database.QueryRowContext(ctx, checkIfTableExistsSQL, databaseName, tableIdentifier).Scan(&result)
err := c.database.QueryRow(ctx, checkIfTableExistsSQL, databaseName, tableIdentifier).Scan(&result)
if err != nil {
return false, fmt.Errorf("error while reading result row: %w", err)
}
Expand All @@ -56,7 +57,7 @@ func (c *ClickhouseConnector) CreateRawTable(ctx context.Context, req *protos.Cr
_peerdb_unchanged_toast_columns String
) ENGINE = ReplacingMergeTree ORDER BY _peerdb_uid;`

_, err := c.execWithLogging(ctx,
err := c.execWithLogging(ctx,
fmt.Sprintf(createRawTableSQL, rawTableName))
if err != nil {
return nil, fmt.Errorf("unable to create raw table: %w", err)
Expand Down Expand Up @@ -129,18 +130,6 @@ func (c *ClickhouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJ
return nil
}

tableSchemaModifyTx, err := c.database.Begin()
if err != nil {
return fmt.Errorf("error starting transaction for schema modification: %w",
err)
}
defer func() {
deferErr := tableSchemaModifyTx.Rollback()
if deferErr != sql.ErrTxDone && deferErr != nil {
c.logger.Error("error rolling back transaction for table schema modification", "error", deferErr)
}
}()

for _, schemaDelta := range schemaDeltas {
if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 {
continue
Expand All @@ -152,9 +141,9 @@ func (c *ClickhouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJ
return fmt.Errorf("failed to convert column type %s to clickhouse type: %w",
addedColumn.Type, err)
}
_, err = c.execWithLoggingTx(ctx,
err = c.execWithLogging(ctx,
fmt.Sprintf("ALTER TABLE %s ADD COLUMN IF NOT EXISTS \"%s\" %s",
schemaDelta.DstTableName, addedColumn.Name, clickhouseColType), tableSchemaModifyTx)
schemaDelta.DstTableName, addedColumn.Name, clickhouseColType))
if err != nil {
return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.Name,
schemaDelta.DstTableName, err)
Expand All @@ -166,22 +155,18 @@ func (c *ClickhouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJ
}
}

err = tableSchemaModifyTx.Commit()
if err != nil {
return fmt.Errorf("failed to commit transaction for table schema modification: %w",
err)
}

return nil
}

func (c *ClickhouseConnector) RenameTables(ctx context.Context, req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
for _, renameRequest := range req.RenameTableOptions {
if req.SyncedAtColName != "" {
syncedAtCol := strings.ToLower(req.SyncedAtColName)
_, err := c.execWithLogging(ctx,
fmt.Sprintf("ALTER TABLE %s UPDATE %s=now() WHERE true SETTINGS allow_nondeterministic_mutations=1",
renameRequest.CurrentName, syncedAtCol))
// get the current timestamp in UTC which can be used as SQL's now()
currentTimestamp := time.Now().UTC().Format("2006-01-02 15:04:05")
err := c.execWithLogging(ctx,
fmt.Sprintf("ALTER TABLE %s UPDATE %s='%s' WHERE true",
renameRequest.CurrentName, syncedAtCol, currentTimestamp))
if err != nil {
return nil, fmt.Errorf("unable to set synced at column for table %s: %w",
renameRequest.CurrentName, err)
Expand All @@ -196,7 +181,7 @@ func (c *ClickhouseConnector) RenameTables(ctx context.Context, req *protos.Rena
allCols := strings.Join(columnNames, ",")
pkeyCols := strings.Join(renameRequest.TableSchema.PrimaryKeyColumns, ",")
c.logger.Info(fmt.Sprintf("handling soft-deletes for table '%s'...", renameRequest.NewName))
_, err := c.execWithLogging(ctx,
err := c.execWithLogging(ctx,
fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)",
renameRequest.CurrentName, fmt.Sprintf("%s,%s", allCols, signColName), allCols,
signColName,
Expand All @@ -206,13 +191,13 @@ func (c *ClickhouseConnector) RenameTables(ctx context.Context, req *protos.Rena
}

// drop the dst table if exists
_, err = c.execWithLogging(ctx, "DROP TABLE IF EXISTS "+renameRequest.NewName)
err = c.execWithLogging(ctx, "DROP TABLE IF EXISTS "+renameRequest.NewName)
if err != nil {
return nil, fmt.Errorf("unable to drop table %s: %w", renameRequest.NewName, err)
}

// rename the src table to dst
_, err = c.execWithLogging(ctx, fmt.Sprintf("RENAME TABLE %s TO %s",
err = c.execWithLogging(ctx, fmt.Sprintf("RENAME TABLE %s TO %s",
renameRequest.CurrentName,
renameRequest.NewName))
if err != nil {
Expand All @@ -237,7 +222,7 @@ func (c *ClickhouseConnector) SyncFlowCleanup(ctx context.Context, jobName strin

// delete raw table if exists
rawTableIdentifier := c.getRawTableName(jobName)
_, err = c.execWithLogging(ctx, fmt.Sprintf(dropTableIfExistsSQL, rawTableIdentifier))
err = c.execWithLogging(ctx, fmt.Sprintf(dropTableIfExistsSQL, rawTableIdentifier))
if err != nil {
return fmt.Errorf("[clickhouse] unable to drop raw table: %w", err)
}
Expand Down
Loading

0 comments on commit 2dbbefa

Please sign in to comment.