Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jul 12, 2023
2 parents c9bd1a3 + 66f1415 commit 3945c4c
Show file tree
Hide file tree
Showing 61 changed files with 2,543 additions and 903 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,7 @@ jobs:
AWS_REGION: ${{ secrets.AWS_REGION }}
TEST_BQ_CREDS: ${{ github.workspace }}/bq_service_account.json
TEST_SF_CREDS: ${{ github.workspace }}/snowflake_creds.json
AZURE_TENANT_ID: ${{ secrets.AZURE_TENANT_ID }}
AZURE_CLIENT_ID: ${{ secrets.AZURE_CLIENT_ID }}
AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }}
AZURE_SUBSCRIPTION_ID: ${{ secrets.AZURE_SUBSCRIPTION_ID }}
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ services:
- 8112:8112
environment:
TEMPORAL_HOST_PORT: temporalite:7233
PEERDB_CATALOG_DB: postgres
PEERDB_CATALOG_DATABASE: postgres
PEERDB_CATALOG_HOST: catalog
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_PORT: 5432
Expand Down Expand Up @@ -121,7 +121,7 @@ services:
PEERDB_CATALOG_PORT: 5432
PEERDB_CATALOG_USER: postgres
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_DB: postgres
PEERDB_CATALOG_DATABASE: postgres
PEERDB_PASSWORD: peerdb
PEERDB_FLOW_SERVER_ADDRESS: http://flow_api:8112
ports:
Expand Down
24 changes: 17 additions & 7 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ func (a *FlowableActivity) CheckConnection(
return nil, fmt.Errorf("failed to get connector: %w", err)
}

needsSetup := conn.NeedsSetupMetadataTables()

return &CheckConnectionResult{
NeedsSetupMetadataTables: conn.NeedsSetupMetadataTables(),
NeedsSetupMetadataTables: needsSetup,
}, nil
}

Expand Down Expand Up @@ -213,13 +215,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlo
return nil, fmt.Errorf("failed to get destination connector: %w", err)
}

log.Println("initializing table schema...")
log.Info("initializing table schema...")
err = dest.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping)
if err != nil {
return nil, fmt.Errorf("failed to initialize table schema: %w", err)
}

log.Println("pulling records...")
log.Info("pulling records...")

records, err := src.PullRecords(&model.PullRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
Expand All @@ -235,14 +237,20 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlo
}

// log the number of records
log.Printf("pulled %d records", len(records.Records))
numRecords := len(records.Records)
log.Printf("pulled %d records", numRecords)

if numRecords == 0 {
log.Info("no records to push")
return nil, nil
}

res, err := dest.SyncRecords(&model.SyncRecordsRequest{
Records: records,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
})

log.Println("pushed records")
log.Info("pushed records")

if err != nil {
return nil, fmt.Errorf("failed to push records: %w", err)
Expand All @@ -266,7 +274,7 @@ func (a *FlowableActivity) StartNormalize(ctx context.Context, input *protos.Sta
return nil, fmt.Errorf("failed to get destination connector: %w", err)
}

log.Println("initializing table schema...")
log.Info("initializing table schema...")
err = dest.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping)
if err != nil {
return nil, fmt.Errorf("failed to initialize table schema: %w", err)
Expand All @@ -280,7 +288,9 @@ func (a *FlowableActivity) StartNormalize(ctx context.Context, input *protos.Sta
}

// log the number of batches normalized
log.Printf("normalized records from batch %d to batch %d\n", res.StartBatchID, res.EndBatchID)
if res != nil {
log.Printf("normalized records from batch %d to batch %d\n", res.StartBatchID, res.EndBatchID)
}

return res, nil
}
Expand Down
61 changes: 61 additions & 0 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,43 @@ func (a *APIServer) StartPeerFlow(reqCtx context.Context, input *peerflow.PeerFl
return workflowID, nil
}

// StartPeerFlowWithConfig starts a peer flow workflow with the given config
func (a *APIServer) StartPeerFlowWithConfig(
reqCtx context.Context,
input *protos.FlowConnectionConfigs) (string, error) {
workflowID := fmt.Sprintf("%s-peerflow-%s", input.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: shared.PeerFlowTaskQueue,
}

maxBatchSize := int(input.MaxBatchSize)
if maxBatchSize == 0 {
maxBatchSize = 100000
}

limits := &peerflow.PeerFlowLimits{
TotalSyncFlows: 0,
TotalNormalizeFlows: 0,
MaxBatchSize: maxBatchSize,
}

state := peerflow.NewStartedPeerFlowState()
_, err := a.temporalClient.ExecuteWorkflow(
reqCtx, // context
workflowOptions, // workflow start options
peerflow.PeerFlowWorkflowWithConfig, // workflow function
input, // workflow input
limits, // workflow limits
state, // workflow state
)
if err != nil {
return "", fmt.Errorf("unable to start PeerFlow workflow: %w", err)
}

return workflowID, nil
}

func genConfigForQRepFlow(config *protos.QRepConfig, flowOptions map[string]interface{},
queryString string, destinationTableIdentifier string) error {
config.InitialCopyOnly = false
Expand Down Expand Up @@ -335,6 +372,30 @@ func APIMain(args *APIServerParams) error {
}
})

r.POST("/flows/start_with_config", func(c *gin.Context) {
var reqJSON protos.FlowConnectionConfigs
data, _ := c.GetRawData()

if err := protojson.Unmarshal(data, &reqJSON); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": err.Error(),
})
return
}

ctx := c.Request.Context()
if id, err := apiServer.StartPeerFlowWithConfig(ctx, &reqJSON); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"error": err.Error(),
})
} else {
c.JSON(http.StatusOK, gin.H{
"status": "ok",
"workflow_id": id,
})
}
})

r.POST("/qrep/start", func(c *gin.Context) {
var reqJSON protos.QRepConfig
data, _ := c.GetRawData()
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func main() {
&cli.StringFlag{
Name: "catalog-db",
Value: "postgres",
EnvVars: []string{"PEERDB_CATALOG_DB"},
EnvVars: []string{"PEERDB_CATALOG_DATABASE"},
},
temporalHostPortFlag,
},
Expand Down
1 change: 1 addition & 0 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func WorkerMain(opts *WorkerOptions) error {

w := worker.New(c, shared.PeerFlowTaskQueue, worker.Options{})
w.RegisterWorkflow(peerflow.PeerFlowWorkflow)
w.RegisterWorkflow(peerflow.PeerFlowWorkflowWithConfig)
w.RegisterWorkflow(peerflow.SyncFlowWorkflow)
w.RegisterWorkflow(peerflow.SetupFlowWorkflow)
w.RegisterWorkflow(peerflow.NormalizeFlowWorkflow)
Expand Down
7 changes: 1 addition & 6 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ func (m *MergeStmtGenerator) generateFlattenedCTE() string {
castStmt = fmt.Sprintf("CAST(JSON_EXTRACT(_peerdb_data, '$.%s') AS %s) AS %s",
colName, bqType, colName)
// expecting data in BASE64 format
case qvalue.QValueKindBytes:
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
castStmt = fmt.Sprintf("FROM_BASE64(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s')) AS %s",
colName, colName)
// MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64)
Expand All @@ -991,11 +991,6 @@ func (m *MergeStmtGenerator) generateFlattenedCTE() string {
// "CAST(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s.Days') AS INT64),0,0,"+
// "CAST(CAST(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s.Microseconds') AS INT64)/1000000 AS INT64)) AS %s",
// colName, colName, colName, colName)
case qvalue.QValueKindBit:
// sample raw data for BIT {"a":{"Bytes":"oA==","Len":3,"Valid":true},"id":1}
// need to check correctness TODO
castStmt = fmt.Sprintf("FROM_BASE64(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s.Bytes')) AS %s",
colName, colName)
// TODO add proper granularity for time types, then restore this
// case model.ColumnTypeTime:
// castStmt = fmt.Sprintf("time(timestamp_micros(CAST(JSON_EXTRACT(_peerdb_data, '$.%s.Microseconds')"+
Expand Down
7 changes: 6 additions & 1 deletion flow/connectors/bigquery/qrecord_value_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func (q QRecordValueSaver) Save() (map[string]bigquery.Value, string, error) {

for i, v := range q.Record.Entries {
k := q.ColumnNames[i]
if v.Value == nil {
bqValues[k] = nil
continue
}

switch v.Kind {
case qvalue.QValueKindFloat32:
val, ok := v.Value.(float32)
Expand Down Expand Up @@ -113,7 +118,7 @@ func (q QRecordValueSaver) Save() (map[string]bigquery.Value, string, error) {

bqValues[k] = RatToBigQueryNumeric(val)

case qvalue.QValueKindBytes:
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
val, ok := v.Value.([]byte)
if !ok {
return nil, "", fmt.Errorf("failed to convert %v to []byte", v.Value)
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand All @@ -17,8 +18,6 @@ type Connector interface {
NeedsSetupMetadataTables() bool
SetupMetadataTables() error
GetLastOffset(jobName string) (*protos.LastSyncState, error)
GetLastSyncBatchID(jobName string) (int64, error)
GetLastNormalizeBatchID(jobName string) (int64, error)

// GetTableSchema returns the schema of a table.
GetTableSchema(req *protos.GetTableSchemaInput) (*protos.TableSchema, error)
Expand Down Expand Up @@ -86,6 +85,8 @@ func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) {
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig())
case *protos.Peer_EventhubConfig:
return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig())
default:
return nil, fmt.Errorf("requested connector is not yet implemented")
}
Expand Down
Loading

0 comments on commit 3945c4c

Please sign in to comment.