Skip to content

Commit

Permalink
Snowflake WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
shayonj committed Feb 2, 2025
1 parent 798a636 commit 34963f2
Show file tree
Hide file tree
Showing 7 changed files with 1,415 additions and 15 deletions.
44 changes: 43 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ var (
Run: runWorker,
}

snowflakeWorkerCmd = &cobra.Command{
Use: "snowflake",
Short: "Start the worker with Snowflake sink",
Run: runWorker,
}

webhookWorkerCmd = &cobra.Command{
Use: "webhook",
Short: "Start the worker with webhook sink",
Expand Down Expand Up @@ -98,7 +104,7 @@ func init() {
addReplicatorFlags(replicatorCmd)
addWorkerFlags(workerCmd)

workerCmd.AddCommand(stdoutWorkerCmd, fileWorkerCmd, postgresWorkerCmd, webhookWorkerCmd)
workerCmd.AddCommand(stdoutWorkerCmd, fileWorkerCmd, postgresWorkerCmd, webhookWorkerCmd, snowflakeWorkerCmd)
rootCmd.AddCommand(replicatorCmd, workerCmd, versionCmd)

cobra.OnInitialize(func() {
Expand All @@ -117,6 +123,11 @@ func init() {
"source-host", "source-dbname", "source-user", "source-password",
)
markFlagRequired(webhookWorkerCmd, "webhook-url")
markFlagRequired(snowflakeWorkerCmd,
"snowflake-account", "snowflake-user", "snowflake-password",
"snowflake-warehouse", "snowflake-database", "snowflake-schema",
"source-host", "source-dbname", "source-user", "source-password",
)
}
})
}
Expand Down Expand Up @@ -479,6 +490,21 @@ func createSink(sinkType string) (sinks.Sink, error) {
return sinks.NewWebhookSink(
viper.GetString("webhook-url"),
)
case "snowflake":
return sinks.NewSnowflakeSink(
viper.GetString("snowflake-account"),
viper.GetString("snowflake-user"),
viper.GetString("snowflake-password"),
viper.GetString("snowflake-role"),
viper.GetString("snowflake-warehouse"),
viper.GetString("snowflake-database"),
viper.GetString("snowflake-schema"),
viper.GetString("source-host"),
viper.GetInt("source-port"),
viper.GetString("source-dbname"),
viper.GetString("source-user"),
viper.GetString("source-password"),
)
default:
return nil, fmt.Errorf("unknown sink type: %s", sinkType)
}
Expand Down Expand Up @@ -522,4 +548,20 @@ func addWorkerFlags(cmd *cobra.Command) {

// Webhook sink specific flags
webhookWorkerCmd.Flags().String("webhook-url", "", "Webhook URL (env: PG_FLO_WEBHOOK_URL)")

// Snowflake sink specific flags
snowflakeWorkerCmd.Flags().String("snowflake-account", "", "Snowflake account (env: PG_FLO_SNOWFLAKE_ACCOUNT)")
snowflakeWorkerCmd.Flags().String("snowflake-user", "", "Snowflake user (env: PG_FLO_SNOWFLAKE_USER)")
snowflakeWorkerCmd.Flags().String("snowflake-password", "", "Snowflake password (env: PG_FLO_SNOWFLAKE_PASSWORD)")
snowflakeWorkerCmd.Flags().String("snowflake-role", "ACCOUNTADMIN", "Snowflake role (env: PG_FLO_SNOWFLAKE_ROLE)")
snowflakeWorkerCmd.Flags().String("snowflake-warehouse", "", "Snowflake warehouse (env: PG_FLO_SNOWFLAKE_WAREHOUSE)")
snowflakeWorkerCmd.Flags().String("snowflake-database", "", "Snowflake database (env: PG_FLO_SNOWFLAKE_DATABASE)")
snowflakeWorkerCmd.Flags().String("snowflake-schema", "PUBLIC", "Snowflake schema (env: PG_FLO_SNOWFLAKE_SCHEMA)")

// Add source database connection flags for Snowflake
snowflakeWorkerCmd.Flags().String("source-host", "", "Source PostgreSQL host (env: PG_FLO_SOURCE_HOST)")
snowflakeWorkerCmd.Flags().Int("source-port", 5432, "Source PostgreSQL port (env: PG_FLO_SOURCE_PORT)")
snowflakeWorkerCmd.Flags().String("source-dbname", "", "Source PostgreSQL database name (env: PG_FLO_SOURCE_DBNAME)")
snowflakeWorkerCmd.Flags().String("source-user", "", "Source PostgreSQL user (env: PG_FLO_SOURCE_USER)")
snowflakeWorkerCmd.Flags().String("source-password", "", "Source PostgreSQL password (env: PG_FLO_SOURCE_PASSWORD)")
}
41 changes: 41 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/nats-io/nats.go v1.38.0
github.com/rs/zerolog v1.33.0
github.com/shopspring/decimal v1.4.0
github.com/snowflakedb/gosnowflake v1.12.0
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.19.0
Expand All @@ -18,36 +19,76 @@ require (
)

require (
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 // indirect
github.com/BurntSushi/toml v1.4.0 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/apache/arrow/go/v15 v15.0.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.26.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.11 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1 // indirect
github.com/aws/smithy-go v1.20.2 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dvsekhvalnov/jose2go v1.6.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/nats-io/nkeys v0.4.9 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/sagikazarmark/locafero v0.6.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.7.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/term v0.27.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/tools v0.24.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 34963f2

Please sign in to comment.