From 34963f28e5125b2de86b740d2c614b2bebea88c9 Mon Sep 17 00:00:00 2001 From: Shayon Mukherjee Date: Sun, 24 Nov 2024 13:14:30 -0500 Subject: [PATCH] Snowflake WIP --- cmd/root.go | 44 +- go.mod | 41 ++ go.sum | 118 ++++ internal/scripts/e2e_snowflake.sh | 274 +++++++++ internal/scripts/e2e_test_local.sh | 6 +- pkg/sinks/snowflake.go | 920 +++++++++++++++++++++++++++++ pkg/utils/cdc_encoding.go | 27 +- 7 files changed, 1415 insertions(+), 15 deletions(-) create mode 100755 internal/scripts/e2e_snowflake.sh create mode 100644 pkg/sinks/snowflake.go diff --git a/cmd/root.go b/cmd/root.go index cbabab0..61137fa 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -61,6 +61,12 @@ var ( Run: runWorker, } + snowflakeWorkerCmd = &cobra.Command{ + Use: "snowflake", + Short: "Start the worker with Snowflake sink", + Run: runWorker, + } + webhookWorkerCmd = &cobra.Command{ Use: "webhook", Short: "Start the worker with webhook sink", @@ -98,7 +104,7 @@ func init() { addReplicatorFlags(replicatorCmd) addWorkerFlags(workerCmd) - workerCmd.AddCommand(stdoutWorkerCmd, fileWorkerCmd, postgresWorkerCmd, webhookWorkerCmd) + workerCmd.AddCommand(stdoutWorkerCmd, fileWorkerCmd, postgresWorkerCmd, webhookWorkerCmd, snowflakeWorkerCmd) rootCmd.AddCommand(replicatorCmd, workerCmd, versionCmd) cobra.OnInitialize(func() { @@ -117,6 +123,11 @@ func init() { "source-host", "source-dbname", "source-user", "source-password", ) markFlagRequired(webhookWorkerCmd, "webhook-url") + markFlagRequired(snowflakeWorkerCmd, + "snowflake-account", "snowflake-user", "snowflake-password", + "snowflake-warehouse", "snowflake-database", "snowflake-schema", + "source-host", "source-dbname", "source-user", "source-password", + ) } }) } @@ -479,6 +490,21 @@ func createSink(sinkType string) (sinks.Sink, error) { return sinks.NewWebhookSink( viper.GetString("webhook-url"), ) + case "snowflake": + return sinks.NewSnowflakeSink( + viper.GetString("snowflake-account"), + viper.GetString("snowflake-user"), + viper.GetString("snowflake-password"), + viper.GetString("snowflake-role"), + viper.GetString("snowflake-warehouse"), + viper.GetString("snowflake-database"), + viper.GetString("snowflake-schema"), + viper.GetString("source-host"), + viper.GetInt("source-port"), + viper.GetString("source-dbname"), + viper.GetString("source-user"), + viper.GetString("source-password"), + ) default: return nil, fmt.Errorf("unknown sink type: %s", sinkType) } @@ -522,4 +548,20 @@ func addWorkerFlags(cmd *cobra.Command) { // Webhook sink specific flags webhookWorkerCmd.Flags().String("webhook-url", "", "Webhook URL (env: PG_FLO_WEBHOOK_URL)") + + // Snowflake sink specific flags + snowflakeWorkerCmd.Flags().String("snowflake-account", "", "Snowflake account (env: PG_FLO_SNOWFLAKE_ACCOUNT)") + snowflakeWorkerCmd.Flags().String("snowflake-user", "", "Snowflake user (env: PG_FLO_SNOWFLAKE_USER)") + snowflakeWorkerCmd.Flags().String("snowflake-password", "", "Snowflake password (env: PG_FLO_SNOWFLAKE_PASSWORD)") + snowflakeWorkerCmd.Flags().String("snowflake-role", "ACCOUNTADMIN", "Snowflake role (env: PG_FLO_SNOWFLAKE_ROLE)") + snowflakeWorkerCmd.Flags().String("snowflake-warehouse", "", "Snowflake warehouse (env: PG_FLO_SNOWFLAKE_WAREHOUSE)") + snowflakeWorkerCmd.Flags().String("snowflake-database", "", "Snowflake database (env: PG_FLO_SNOWFLAKE_DATABASE)") + snowflakeWorkerCmd.Flags().String("snowflake-schema", "PUBLIC", "Snowflake schema (env: PG_FLO_SNOWFLAKE_SCHEMA)") + + // Add source database connection flags for Snowflake + snowflakeWorkerCmd.Flags().String("source-host", "", "Source PostgreSQL host (env: PG_FLO_SOURCE_HOST)") + snowflakeWorkerCmd.Flags().Int("source-port", 5432, "Source PostgreSQL port (env: PG_FLO_SOURCE_PORT)") + snowflakeWorkerCmd.Flags().String("source-dbname", "", "Source PostgreSQL database name (env: PG_FLO_SOURCE_DBNAME)") + snowflakeWorkerCmd.Flags().String("source-user", "", "Source PostgreSQL user (env: PG_FLO_SOURCE_USER)") + snowflakeWorkerCmd.Flags().String("source-password", "", "Source PostgreSQL password (env: PG_FLO_SOURCE_PASSWORD)") } diff --git a/go.mod b/go.mod index d3a5bc7..99589e2 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/nats-io/nats.go v1.38.0 github.com/rs/zerolog v1.33.0 github.com/shopspring/decimal v1.4.0 + github.com/snowflakedb/gosnowflake v1.12.0 github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.19.0 @@ -18,36 +19,76 @@ require ( ) require ( + github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect + github.com/99designs/keyring v1.2.2 // indirect + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 // indirect + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 // indirect + github.com/BurntSushi/toml v1.4.0 // indirect + github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect + github.com/apache/arrow/go/v15 v15.0.0 // indirect + github.com/aws/aws-sdk-go-v2 v1.26.1 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.11 // indirect + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.15 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.5 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1 // indirect + github.com/aws/smithy-go v1.20.2 // indirect + github.com/danieljoos/wincred v1.1.2 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dvsekhvalnov/jose2go v1.6.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/gabriel-vasile/mimetype v1.4.2 // indirect + github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect + github.com/golang-jwt/jwt/v5 v5.2.1 // indirect + github.com/google/flatbuffers v23.5.26+incompatible // indirect + github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/mtibben/percent v0.2.1 // indirect github.com/nats-io/nkeys v0.4.9 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/pierrec/lz4/v4 v4.1.18 // indirect + github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/sagikazarmark/locafero v0.6.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.7.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/subosito/gotenv v1.6.0 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.31.0 // indirect golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa // indirect + golang.org/x/mod v0.20.0 // indirect + golang.org/x/net v0.28.0 // indirect golang.org/x/sync v0.10.0 // indirect golang.org/x/sys v0.28.0 // indirect + golang.org/x/term v0.27.0 // indirect golang.org/x/text v0.21.0 // indirect + golang.org/x/tools v0.24.0 // indirect + golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index f83317e..a8460d8 100644 --- a/go.sum +++ b/go.sum @@ -1,29 +1,107 @@ +github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs= +github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4= +github.com/99designs/keyring v1.2.2 h1:pZd3neh/EmUzWONb35LxQfvuY7kiSXAq3HQd97+XBn0= +github.com/99designs/keyring v1.2.2/go.mod h1:wes/FrByc8j7lFOAGLGSNEg8f/PaI3cgTBqhFkHUrPk= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.0 h1:rTnT/Jrcm+figWlYz4Ixzt0SJVR2cMC8lvZcimipiEY= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.0/go.mod h1:ON4tFdPTwRcgWEaVDrN3584Ef+b7GgSJaXxe5fW9t4M= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 h1:+5VZ72z0Qan5Bog5C+ZkgSqUbeVUd9wgtHOrIKuc5b8= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 h1:u/LLAOFgsMv7HmNL4Qufg58y+qElGOt5qv0z1mURkRY= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0/go.mod h1:2e8rMJtl2+2j+HXbTBwnyGpm5Nou7KhvSfxOq8JpTag= +github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE= +github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= +github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= +github.com/apache/arrow/go/v15 v15.0.0 h1:1zZACWf85oEZY5/kd9dsQS7i+2G5zVQcbKTHgslqHNA= +github.com/apache/arrow/go/v15 v15.0.0/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA= +github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA= +github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 h1:x6xsQXGSmW6frevwDA+vi/wqhp1ct18mVXYN08/93to= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2/go.mod h1:lPprDr1e6cJdyYeGXnRaJoP4Md+cDBvi2eOj00BlGmg= +github.com/aws/aws-sdk-go-v2/config v1.27.11 h1:f47rANd2LQEYHda2ddSCKYId18/8BhSRM4BULGmfgNA= +github.com/aws/aws-sdk-go-v2/config v1.27.11/go.mod h1:SMsV78RIOYdve1vf36z8LmnszlRWkwMQtomCAI0/mIE= +github.com/aws/aws-sdk-go-v2/credentials v1.17.11 h1:YuIB1dJNf1Re822rriUOTxopaHHvIq0l/pX3fwO+Tzs= +github.com/aws/aws-sdk-go-v2/credentials v1.17.11/go.mod h1:AQtFPsDH9bI2O+71anW6EKL+NcD7LG3dpKGMV4SShgo= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 h1:FVJ0r5XTHSmIHJV6KuDmdYhEpvlHpiSd38RQWhut5J4= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1/go.mod h1:zusuAeqezXzAB24LGuzuekqMAEgWkVYukBec3kr3jUg= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.15 h1:7Zwtt/lP3KNRkeZre7soMELMGNoBrutx8nobg1jKWmo= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.15/go.mod h1:436h2adoHb57yd+8W+gYPrrA9U/R/SuAuOO42Ushzhw= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 h1:aw39xVGeRWlWx9EzGVnhOR4yOjQDHPQ6o6NmBlscyQg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5/go.mod h1:FSaRudD0dXiMPK2UjknVwwTYyZMRsHv3TtkabsZih5I= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 h1:PG1F3OD1szkuQPzDw3CIQsRIrtTlUC3lP84taWzHlq0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5/go.mod h1:jU1li6RFryMz+so64PpKtudI+QzbKoIEivqdf6LNpOc= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.5 h1:81KE7vaZzrl7yHBYHVEzYB8sypz11NMOZ40YlWvPxsU= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.5/go.mod h1:LIt2rg7Mcgn09Ygbdh/RdIm0rQ+3BNkbP1gyVMFtRK0= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7 h1:ZMeFZ5yk+Ek+jNr1+uwCd2tG89t6oTS5yVWpa6yy2es= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7/go.mod h1:mxV05U+4JiHqIpGqqYXOHLPKUC6bDXC44bsUhNjOEwY= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 h1:ogRAwT1/gxJBcSWDMZlgyFUM962F51A5CRhDLbxLdmo= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7/go.mod h1:YCsIZhXfRPLFFCl5xxY+1T9RKzOKjCut+28JSX2DnAk= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5 h1:f9RyWNtS8oH7cZlbn+/JNPpjUk5+5fLd5lM9M0i49Ys= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5/go.mod h1:h5CoMZV2VF297/VLhRhO1WF+XYWOzXo+4HsObA4HjBQ= +github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1 h1:6cnno47Me9bRykw9AEv9zkXE+5or7jz8TsskTTccbgc= +github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1/go.mod h1:qmdkIIAC+GCLASF7R2whgNrJADz0QZPX+Seiw/i4S3o= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 h1:vN8hEbpRnL7+Hopy9dzmRle1xmDc7o8tmY0klsr175w= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.5/go.mod h1:qGzynb/msuZIE8I75DVRCUXw3o3ZyBmUvMwQ2t/BrGM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 h1:Jux+gDDyi1Lruk+KHF91tK2KCuY61kzoCpvtvJJBtOE= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4/go.mod h1:mUYPBhaF2lGiukDEjJX2BLRRKTmoUSitGDUgM4tRxak= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 h1:cwIxeBttqPN3qkaAjcEcsh8NYr8n2HZPkcKgPAi1phU= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.6/go.mod h1:FZf1/nKNEkHdGGJP/cI2MoIMquumuRK6ol3QQJNDxmw= +github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= +github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= +github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0= +github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c= +github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko= +github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY= +github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= +github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/goccy/go-json v0.10.4 h1:JSwxQzIqKfmFX1swYPpUThQZp/Ka4wzJdK0LWVytLPM= github.com/goccy/go-json v0.10.4/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0= +github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= +github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= +github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8ioaQmyPLg1b8VwK5WJg= +github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= +github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -85,9 +163,15 @@ github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dv github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= +github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -98,6 +182,8 @@ github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= @@ -118,14 +204,21 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= +github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA= github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw= github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= +github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= +github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU= +github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -152,6 +245,10 @@ github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/snowflakedb/gosnowflake v1.12.0 h1:Saez8egtn5xAoVMBxFaMu9MYfAG9SS9dpAEXD1/ECIo= +github.com/snowflakedb/gosnowflake v1.12.0/go.mod h1:wHfYmZi3zvtWItojesAhWWXBN7+niex2R1h/S7QCZYg= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= @@ -186,6 +283,10 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -221,6 +322,8 @@ golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKG golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0= +golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -230,6 +333,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -247,7 +352,10 @@ golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210616045830-e2b7044e8c71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -263,6 +371,8 @@ golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuX golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= +golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= +golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -285,13 +395,20 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= +golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o= +gonum.org/v1/gonum v0.12.0/go.mod h1:73TDxJfAAHeA8Mk9mf8NlIppyhQNo5GLTcYeqgo2lvY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= @@ -299,6 +416,7 @@ gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:a gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/scripts/e2e_snowflake.sh b/internal/scripts/e2e_snowflake.sh new file mode 100755 index 0000000..b692ed6 --- /dev/null +++ b/internal/scripts/e2e_snowflake.sh @@ -0,0 +1,274 @@ +#!/bin/bash +set -euo pipefail + +source "$(dirname "$0")/e2e_common.sh" + +create_test_table() { + log "Creating test table in PostgreSQL..." + run_sql "DROP TABLE IF EXISTS public.data_type_test;" + run_sql "CREATE TABLE public.data_type_test ( + id serial PRIMARY KEY NOT NULL, + text_data text NOT NULL, + binary_data bytea NOT NULL, + timestamp_data timestamp with time zone NOT NULL, + float_data double precision NOT NULL, + integer_data integer NOT NULL, + smallint_data smallint NOT NULL, + bigint_data bigint NOT NULL, + boolean_data boolean NOT NULL, + json_data json NOT NULL, + jsonb_data jsonb NOT NULL, + array_text_data text[] NOT NULL, + array_int_data integer[] NOT NULL, + array_bytea_data bytea[] NOT NULL, + numeric_data numeric(10, 2) NOT NULL, + uuid_data uuid NOT NULL, + inet_data inet NOT NULL, + cidr_data cidr NOT NULL, + macaddr_data macaddr NOT NULL, + interval_data interval NOT NULL + );" + success "Test table created" +} + +start_pg_flo_replication() { + log "Starting pg_flo replicator..." + if [ -f "$pg_flo_LOG" ]; then + mv "$pg_flo_LOG" "${pg_flo_LOG}.bak" + log "Backed up previous replicator log to ${pg_flo_LOG}.bak" + fi + + cat >"/tmp/pg_flo_replicator.yml" <"$pg_flo_LOG" 2>&1 & + pg_flo_PID=$! + log "pg_flo replicator started with PID: $pg_flo_PID" + success "pg_flo replicator started" +} + +start_pg_flo_worker() { + log "Starting pg_flo worker with Snowflake sink..." + if [ -f "$pg_flo_WORKER_LOG" ]; then + mv "$pg_flo_WORKER_LOG" "${pg_flo_WORKER_LOG}.bak" + log "Backed up previous worker log to ${pg_flo_WORKER_LOG}.bak" + fi + + # Create worker config file + cat >"/tmp/pg_flo_worker.yml" <"$pg_flo_WORKER_LOG" 2>&1 & + pg_flo_WORKER_PID=$! + log "pg_flo worker started with PID: $pg_flo_WORKER_PID" + success "pg_flo worker started" +} + +populate_initial_data() { + log "Populating initial data..." + run_sql "INSERT INTO public.data_type_test ( + text_data, + binary_data, + timestamp_data, + float_data, + integer_data, + smallint_data, + bigint_data, + boolean_data, + json_data, + jsonb_data, + array_text_data, + array_int_data, + array_bytea_data, + numeric_data, + uuid_data, + inet_data, + cidr_data, + macaddr_data, + interval_data + ) SELECT + 'TEXT' || generate_series, + decode('deadbeef', 'hex'), + current_timestamp + (generate_series || ' minutes')::interval, + random() * 100, + generate_series, + generate_series % 32767, + generate_series, + (random() > 0.5), + json_build_object('key', 'value' || generate_series, 'number', generate_series), + jsonb_build_object('key', 'value' || generate_series, 'number', generate_series), + ARRAY['a'||generate_series, 'b'||generate_series, 'c'||generate_series], + ARRAY[generate_series, generate_series+1, generate_series+2], + ARRAY[decode('deadbeef', 'hex'), decode('beefdead', 'hex')], + random() * 1000, + gen_random_uuid(), + ('192.168.1.' || (generate_series % 255))::inet, + '192.168.1.0/24'::cidr, + '08:00:2b:01:02:03'::macaddr, + (generate_series || ' days')::interval + FROM generate_series(1, 10000);" + + run_sql "ANALYZE public.data_type_test;" + success "Initial data populated" +} + +simulate_concurrent_changes() { + log "Simulating concurrent changes..." + for i in {1..100}; do + run_sql "INSERT INTO public.data_type_test ( + text_data, + binary_data, + timestamp_data, + float_data, + integer_data, + smallint_data, + bigint_data, + boolean_data, + json_data, + jsonb_data, + array_text_data, + array_int_data, + array_bytea_data, + numeric_data, + uuid_data, + inet_data, + cidr_data, + macaddr_data, + interval_data + ) VALUES ( + 'Concurrent text $i', + decode('deadbeef', 'hex'), + current_timestamp, + random() * 100, + $i, + $i, + $i, + true, + '{\"key\": \"concurrent_$i\"}', + '{\"key\": \"concurrent_$i\"}', + ARRAY['a'||$i, 'b'||$i, 'c'||$i], + ARRAY[$i, $i+1, $i+2], + ARRAY[decode('deadbeef', 'hex'), decode('beefdead', 'hex')], + random() * 1000, + gen_random_uuid(), + ('192.168.1.' || ($i % 255))::inet, + '192.168.1.0/24'::cidr, + '08:00:2b:01:02:03'::macaddr, + ($i || ' days')::interval + );" + done + + run_sql "UPDATE public.data_type_test SET + binary_data = decode('deadc0de', 'hex'), + json_data = '{\"key\": \"updated_5000\", \"number\": 5000}', + jsonb_data = '{\"array\": [5000, 5001, 5002], \"nested\": {\"key\": \"updated_5000\"}}', + timestamp_data = CURRENT_TIMESTAMP + WHERE id = 5000;" + + run_sql "DELETE FROM public.data_type_test WHERE id % 1000 = 0;" + success "Concurrent changes simulated" +} + +test_pg_flo_snowflake() { + setup_postgres + create_test_table + populate_initial_data + start_pg_flo_replication + sleep 5 + start_pg_flo_worker + sleep 5 + simulate_concurrent_changes + sleep 45000 + stop_pg_flo_gracefully +} + +log "Starting data setup for Snowflake test..." +test_pg_flo_snowflake +log "Data setup completed" + +# -- Basic row count verification +# SELECT COUNT(*) FROM data_type_test; + +# -- Verify the specific record with binary and JSON updates (ID 5000) +# SELECT +# id, +# binary_data, +# json_data, +# jsonb_data, +# timestamp_data +# FROM data_type_test +# WHERE id = 5000; + +# -- Verify deleted records (should return no rows) +# SELECT id +# FROM data_type_test +# WHERE id IN (1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000); + +# -- Verify concurrent inserts (should show 100 records) +# SELECT +# id, +# text_data, +# binary_data, +# json_data, +# array_int_data, +# timestamp_data +# FROM data_type_test +# WHERE id > 10000 +# ORDER BY id; + +# -- Verify data type conversions for a sample record +# SELECT +# id, +# text_data, +# binary_data, +# timestamp_data, +# float_data, +# integer_data, +# smallint_data, +# bigint_data, +# boolean_data, +# json_data, +# jsonb_data, +# array_text_data, +# array_int_data, +# array_bytea_data, +# numeric_data, +# uuid_data, +# inet_data, +# cidr_data, +# macaddr_data, +# interval_data +# FROM data_type_test +# WHERE id = 42; -- arbitrary row for type checking diff --git a/internal/scripts/e2e_test_local.sh b/internal/scripts/e2e_test_local.sh index 465a05b..3f8d64f 100755 --- a/internal/scripts/e2e_test_local.sh +++ b/internal/scripts/e2e_test_local.sh @@ -33,9 +33,9 @@ make build setup_docker -log "Running e2e ddl tests..." -if CI=false ruby ./internal/scripts/e2e_resume_test.rb; then - success "e2e ddl tests completed successfully" +log "Running e2e snowflake tests..." +if CI=false ./internal/scripts/e2e_copy_and_stream.sh; then + success "e2e snowflake tests completed successfully" else error "Original e2e tests failed" exit 1 diff --git a/pkg/sinks/snowflake.go b/pkg/sinks/snowflake.go new file mode 100644 index 0000000..6bd9a5b --- /dev/null +++ b/pkg/sinks/snowflake.go @@ -0,0 +1,920 @@ +package sinks + +import ( + "context" + "database/sql" + "encoding/hex" + "encoding/json" + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/pgflo/pg_flo/pkg/utils" + + "github.com/rs/zerolog/log" + sf "github.com/snowflakedb/gosnowflake" +) + +type SnowflakeSink struct { + conn *sql.DB + sourceConn *pgx.Conn + connConfig *sf.Config + sourceConfig *pgx.ConnConfig + warehouse string + database string + schema string + logger utils.Logger + retryConfig utils.RetryConfig + tableMetadata map[string]TableMetadata + syncedTables sync.Map +} + +type TableMetadata struct { + PrimaryKeys []string + UniqueKeys [][]string + NotNullCols []string +} + +// Constants for SQL templates +const ( + insertTemplate = "INSERT INTO %s.%s.%s (%s) SELECT %s" + updateTemplate = "UPDATE %s.%s.%s SET %s WHERE %s" + deleteTemplate = "DELETE FROM %s.%s.%s WHERE %s" +) + +// columnExpr represents a column expression with its value +type columnExpr struct { + name string + value interface{} + dataType uint32 + isJSONCol bool +} + +// NewSnowflakeSink creates a new Snowflake sink instance with the provided configuration +func NewSnowflakeSink(account, user, password, role, warehouse, database, schema string, sourceHost string, sourcePort int, sourceDBName, sourceUser, sourcePassword string) (*SnowflakeSink, error) { + log.Debug(). + Str("snowflake_account", account). + Str("database", database). + Str("role", role). + Msg("Initializing Snowflake sink") + + // Initialize source config first + sourceConfig, err := pgx.ParseConfig(fmt.Sprintf("host=%s port=%d dbname=%s user=%s password=%s", + sourceHost, sourcePort, sourceDBName, sourceUser, sourcePassword)) + if err != nil { + return nil, fmt.Errorf("failed to parse source connection config: %w", err) + } + + cfg := &sf.Config{ + Account: account, + User: user, + Password: password, + Role: role, + Database: database, + Schema: schema, + Warehouse: warehouse, + LoginTimeout: 1 * time.Second, + RequestTimeout: 5 * time.Second, + } + + sink := &SnowflakeSink{ + connConfig: cfg, + sourceConfig: sourceConfig, + warehouse: warehouse, + database: database, + schema: schema, + logger: utils.NewZerologLogger(log.With().Str("component", "snowflake_sink").Logger()), + retryConfig: utils.RetryConfig{ + MaxAttempts: 5, + InitialWait: 1 * time.Second, + MaxWait: 30 * time.Second, + }, + tableMetadata: make(map[string]TableMetadata), + } + + if err := sink.connectSource(context.Background()); err != nil { + return nil, fmt.Errorf("failed to connect to source database: %w", err) + } + + if err := sink.loadTableConstraints(); err != nil { + return nil, fmt.Errorf("failed to load table constraints: %w", err) + } + + if err := sink.connect(context.Background()); err != nil { + return nil, err + } + + return sink, nil +} + +// connectSource establishes a connection to the source PostgreSQL database +func (s *SnowflakeSink) connectSource(ctx context.Context) error { + var connMutex sync.Mutex + + return utils.WithRetry(ctx, s.retryConfig, func() error { + conn, err := pgx.ConnectConfig(ctx, s.sourceConfig) + if err != nil { + s.logger.Error().Err(err).Msg("Failed to connect to source database, will retry") + return err + } + connMutex.Lock() + s.sourceConn = conn + connMutex.Unlock() + return nil + }) +} + +// connect establishes a connection to the Snowflake database +func (s *SnowflakeSink) connect(ctx context.Context) error { + var connMutex sync.Mutex + + return utils.WithRetry(ctx, s.retryConfig, func() error { + dsn, err := sf.DSN(s.connConfig) + if err != nil { + return fmt.Errorf("failed to create DSN: %w", err) + } + + db, err := sql.Open("snowflake", dsn) + if err != nil { + s.logger.Error().Err(err).Msg("Failed to connect to Snowflake, will retry") + return err + } + + if err := db.PingContext(ctx); err != nil { + db.Close() + return err + } + + // Set the BINARY_INPUT_FORMAT session parameter + if _, err := db.ExecContext(ctx, "ALTER SESSION SET BINARY_INPUT_FORMAT='HEX'"); err != nil { + db.Close() + return fmt.Errorf("failed to set binary input format: %w", err) + } + + connMutex.Lock() + s.conn = db + connMutex.Unlock() + return nil + }) +} + +// isSnowflakeConnectionError checks if an error is related to connection issues +func isSnowflakeConnectionError(err error) bool { + if err == nil { + return false + } + + errStr := strings.ToUpper(err.Error()) + return strings.Contains(errStr, "CONNECTION") || + strings.Contains(errStr, "NETWORK") || + strings.Contains(errStr, "EOF") || + strings.Contains(errStr, "CLOSED") || + strings.Contains(errStr, "TIMEOUT") +} + +// Close closes the database connections +func (s *SnowflakeSink) Close() error { + if s.conn != nil { + return s.conn.Close() + } + return nil +} + +// syncSchema ensures the target table schema exists in Snowflake. +// It uses syncedTables to track which tables have already been synced. +func (s *SnowflakeSink) syncSchema(ctx context.Context, message *utils.CDCMessage) error { + tableKey := fmt.Sprintf("%s.%s", s.schema, message.Table) + if _, exists := s.syncedTables.Load(tableKey); exists { + return nil + } + + if _, loaded := s.syncedTables.LoadOrStore(tableKey, true); loaded { + return nil + } + + exists, err := s.tableExists(ctx, s.schema, message.Table) + if err != nil { + s.syncedTables.Delete(tableKey) + return fmt.Errorf("failed to check if table exists: %w", err) + } + + if !exists { + key := fmt.Sprintf("%s.%s", s.schema, message.Table) + metadata := s.tableMetadata[key] + + columns := make([]string, 0, len(message.Columns)) + for _, col := range message.Columns { + snowflakeType, err := mapPostgresToSnowflakeType(col.DataType) + if err != nil { + s.syncedTables.Delete(tableKey) + return fmt.Errorf("failed to map type for column %s: %w", col.Name, err) + } + + columnDef := fmt.Sprintf(`%s %s`, quoteIdentifier(col.Name), snowflakeType) + if contains(metadata.NotNullCols, col.Name) { + columnDef += " NOT NULL" + } + columns = append(columns, columnDef) + } + + if len(metadata.PrimaryKeys) > 0 { + pkConstraint := fmt.Sprintf("PRIMARY KEY (%s)", quotedColumns(metadata.PrimaryKeys)) + columns = append(columns, pkConstraint) + } + + for _, uniqueKey := range metadata.UniqueKeys { + uniqueConstraint := fmt.Sprintf("UNIQUE (%s)", quotedColumns(uniqueKey)) + columns = append(columns, uniqueConstraint) + } + + createTableSQL := fmt.Sprintf( + `CREATE TABLE IF NOT EXISTS %s.%s.%s (%s)`, + quoteIdentifier(s.database), + quoteIdentifier(s.schema), + quoteIdentifier(message.Table), + strings.Join(columns, ", "), + ) + + s.logger.Debug(). + Str("sql", createTableSQL). + Msg("Creating table") + + if _, err := s.conn.ExecContext(ctx, createTableSQL); err != nil { + s.syncedTables.Delete(tableKey) + s.logger.Error(). + Err(err). + Str("sql", createTableSQL). + Str("schema", s.schema). + Str("table", message.Table). + Msg("Failed to create table") + return fmt.Errorf("failed to create table: %w", err) + } + + s.logger.Info(). + Str("schema", s.schema). + Str("table", message.Table). + Msg("Successfully created table") + } + + return nil +} + +// Add a method to reset synced tables (useful for testing or manual resets) +func (s *SnowflakeSink) ResetSyncedTables() { + s.syncedTables.Range(func(key, _ interface{}) bool { + s.syncedTables.Delete(key) + return true + }) +} + +// Add a method to check if a table is synced (useful for testing) +func (s *SnowflakeSink) IsTableSynced(schema, table string) bool { + tableKey := fmt.Sprintf("%s.%s", schema, table) + _, exists := s.syncedTables.Load(tableKey) + return exists +} + +// contains checks if a string exists in a slice. +func contains(slice []string, str string) bool { + for _, s := range slice { + if s == str { + return true + } + } + return false +} + +// tableExists checks if a table exists in Snowflake. +func (s *SnowflakeSink) tableExists(ctx context.Context, schema, table string) (bool, error) { + query := ` + SELECT COUNT(*) + FROM information_schema.tables + WHERE table_schema = ? + AND table_name = ? + AND table_type = 'BASE TABLE' + ` + + var count int + err := s.conn.QueryRowContext(ctx, query, schema, table).Scan(&count) + if err != nil { + s.logger.Error(). + Err(err). + Str("schema", schema). + Str("table", table). + Str("query", query). + Msg("Error checking table existence") + return false, fmt.Errorf("failed to check table existence: %w", err) + } + + exists := count > 0 + + return exists, nil +} + +// mapPostgresToSnowflakeType maps PostgreSQL data types to Snowflake equivalents. +func mapPostgresToSnowflakeType(pgType uint32) (string, error) { + switch pgType { + case pgtype.Int2OID: + return "SMALLINT", nil + case pgtype.Int4OID: + return "INTEGER", nil + case pgtype.Int8OID: + return "BIGINT", nil + case pgtype.Float4OID: + return "FLOAT", nil + case pgtype.Float8OID: + return "DOUBLE", nil + case pgtype.NumericOID: + return "NUMBER", nil + case pgtype.BoolOID: + return "BOOLEAN", nil + case pgtype.DateOID: + return "DATE", nil + case pgtype.TimestampOID: + return "TIMESTAMP_NTZ", nil + case pgtype.TimestamptzOID: + return "TIMESTAMP_TZ", nil + case pgtype.TimeOID: + return "TIME", nil + case pgtype.JSONOID, pgtype.JSONBOID: + return "VARIANT", nil + case pgtype.ByteaOID: + return "BINARY", nil + case pgtype.TextOID, pgtype.VarcharOID, pgtype.BPCharOID: + return "VARCHAR", nil + default: + return "VARCHAR", nil + } +} + +// loadTableConstraints loads table constraints from the source database. +func (s *SnowflakeSink) loadTableConstraints() error { + ctx := context.Background() + query := ` + SELECT + tc.table_schema, + tc.table_name, + kcu.column_name, + tc.constraint_type, + c.is_nullable + FROM information_schema.table_constraints AS tc + JOIN information_schema.key_column_usage AS kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + AND tc.table_name = kcu.table_name + JOIN information_schema.columns AS c + ON c.table_schema = tc.table_schema + AND c.table_name = tc.table_name + AND c.column_name = kcu.column_name + WHERE tc.constraint_type IN ('PRIMARY KEY', 'UNIQUE') + ` + + rows, err := s.sourceConn.Query(ctx, query) + if err != nil { + return fmt.Errorf("failed to query constraints: %w", err) + } + defer rows.Close() + + for rows.Next() { + var schema, table, column, constraintType, isNullable string + if err := rows.Scan(&schema, &table, &column, &constraintType, &isNullable); err != nil { + return fmt.Errorf("failed to scan constraint row: %w", err) + } + + key := fmt.Sprintf("%s.%s", schema, table) + metadata := s.tableMetadata[key] + + switch constraintType { + case "PRIMARY KEY": + metadata.PrimaryKeys = append(metadata.PrimaryKeys, column) + case "UNIQUE": + metadata.UniqueKeys = append(metadata.UniqueKeys, []string{column}) + } + + if isNullable == "NO" { + metadata.NotNullCols = append(metadata.NotNullCols, column) + } + + s.tableMetadata[key] = metadata + } + + return nil +} + +// quotedColumns returns a comma-separated list of quoted column names. +func quotedColumns(columns []string) string { + quoted := make([]string, len(columns)) + for i, col := range columns { + quoted[i] = quoteIdentifier(col) + } + return strings.Join(quoted, ", ") +} + +// quoteIdentifier properly quotes an identifier for Snowflake. +func quoteIdentifier(identifier string) string { + return fmt.Sprintf(`"%s"`, strings.ReplaceAll(identifier, `"`, `""`)) +} + +// WriteBatch writes a batch of CDC messages to Snowflake +func (s *SnowflakeSink) WriteBatch(messages []*utils.CDCMessage) error { + ctx := context.Background() + + if s.conn == nil { + if err := s.connect(ctx); err != nil { + return fmt.Errorf("failed to connect to Snowflake: %w", err) + } + } + + return s.writeBatchInternal(ctx, messages) +} + +// writeBatchInternal handles the internal batch writing logic +func (s *SnowflakeSink) writeBatchInternal(ctx context.Context, messages []*utils.CDCMessage) error { + tx, err := s.conn.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + + defer func() { + if tx != nil { + if err := tx.Rollback(); err != nil { + s.logger.Error().Err(err).Msg("failed to rollback transaction") + } + } + }() + + for _, message := range messages { + var operationErr error + err := utils.WithRetry(ctx, s.retryConfig, func() error { + if s.conn == nil { + if err := s.connect(ctx); err != nil { + return fmt.Errorf("failed to reconnect to Snowflake: %w", err) + } + newTx, err := s.conn.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin new transaction: %w", err) + } + tx = newTx + } + + // Ensure schema/table exists + if err := s.syncSchema(ctx, message); err != nil { + return fmt.Errorf("failed to sync schema: %w", err) + } + + s.logger.Debug().Msgf("Handling %s for table %s.%s", message.Type, message.Schema, message.Table) + + switch message.Type { + case utils.OperationInsert: + operationErr = s.handleInsert(ctx, tx, message) + case utils.OperationUpdate: + operationErr = s.handleUpdate(ctx, tx, message) + case utils.OperationDelete: + operationErr = s.handleDelete(ctx, tx, message) + case utils.OperationDDL: + operationErr = s.handleDDL(ctx, tx, message) + default: + operationErr = fmt.Errorf("unknown operation type: %s", message.Type) + } + + if operationErr != nil && isSnowflakeConnectionError(operationErr) { + return operationErr // Retry on connection errors + } + return nil + }) + + if err != nil || operationErr != nil { + if tx != nil { + if rollbackErr := tx.Rollback(); rollbackErr != nil { + s.logger.Error().Err(rollbackErr).Msg("failed to rollback transaction") + } + } + tx = nil + return fmt.Errorf("failed to handle %s for table %s.%s: %w-%w", + message.Type, + message.Schema, + message.Table, + err, + operationErr, + ) + } + } + + s.logger.Debug().Msg("Committing transaction") + if err := tx.Commit(); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + tx = nil // Prevent deferred rollback + return nil +} + +// buildColumnExprs creates column expressions for the given message +func (s *SnowflakeSink) buildColumnExprs(message *utils.CDCMessage, useOldValues bool) ([]columnExpr, error) { + exprs := make([]columnExpr, 0, len(message.Columns)) + + for i, col := range message.Columns { + if message.IsColumnToasted(col.Name) { + continue + } + + var data []byte + if useOldValues && message.OldTuple != nil { + data = message.OldTuple.Columns[i].Data + } else if message.NewTuple != nil { + data = message.NewTuple.Columns[i].Data + } else { + continue + } + + value, err := utils.DecodeValue(data, col.DataType) + if err != nil { + return nil, fmt.Errorf("failed to decode column value: %w", err) + } + + sfValue, err := convertToSnowflakeValue(value, col.DataType) + if err != nil { + return nil, fmt.Errorf("failed to convert value for column %s: %w", col.Name, err) + } + + exprs = append(exprs, columnExpr{ + name: col.Name, + value: sfValue, + dataType: col.DataType, + isJSONCol: col.DataType == pgtype.JSONOID || col.DataType == pgtype.JSONBOID, + }) + } + + return exprs, nil +} + +// buildInsertQuery constructs the INSERT query and values +func (s *SnowflakeSink) buildInsertQuery(message *utils.CDCMessage, exprs []columnExpr) (string, []interface{}, error) { + columns := make([]string, 0, len(exprs)) + valueExprs := make([]string, 0, len(exprs)) + values := make([]interface{}, 0, len(exprs)) + + for _, expr := range exprs { + columns = append(columns, quoteIdentifier(expr.name)) + if expr.isJSONCol { + valueExprs = append(valueExprs, "TRY_PARSE_JSON(CAST(? AS VARCHAR))") + } else { + valueExprs = append(valueExprs, "?") + } + values = append(values, expr.value) + } + + query := fmt.Sprintf(insertTemplate, + quoteIdentifier(s.database), + quoteIdentifier(s.schema), + quoteIdentifier(message.Table), + strings.Join(columns, ", "), + strings.Join(valueExprs, ", "), + ) + + return query, values, nil +} + +// buildUpdateQuery constructs the UPDATE query and values +func (s *SnowflakeSink) buildUpdateQuery(message *utils.CDCMessage, setExprs, whereExprs []columnExpr) (string, []interface{}, error) { + setClauses := make([]string, 0, len(setExprs)) + whereConditions := make([]string, 0, len(whereExprs)) + values := make([]interface{}, 0, len(setExprs)+len(whereExprs)) + + // Build SET clause + for _, expr := range setExprs { + if expr.isJSONCol { + setClauses = append(setClauses, fmt.Sprintf("%s = TRY_PARSE_JSON(CAST(? AS VARCHAR))", quoteIdentifier(expr.name))) + } else { + setClauses = append(setClauses, fmt.Sprintf("%s = ?", quoteIdentifier(expr.name))) + } + values = append(values, expr.value) + } + + // Build WHERE clause + for _, expr := range whereExprs { + if expr.value == nil { + whereConditions = append(whereConditions, fmt.Sprintf("%s IS NULL", quoteIdentifier(expr.name))) + } else { + whereConditions = append(whereConditions, fmt.Sprintf("%s = ?", quoteIdentifier(expr.name))) + values = append(values, expr.value) + } + } + + query := fmt.Sprintf(updateTemplate, + quoteIdentifier(s.database), + quoteIdentifier(s.schema), + quoteIdentifier(message.Table), + strings.Join(setClauses, ", "), + strings.Join(whereConditions, " AND "), + ) + + return query, values, nil +} + +// buildDeleteQuery constructs the DELETE query and values +func (s *SnowflakeSink) buildDeleteQuery(message *utils.CDCMessage, whereExprs []columnExpr) (string, []interface{}, error) { + whereConditions := make([]string, 0, len(whereExprs)) + values := make([]interface{}, 0, len(whereExprs)) + + for _, expr := range whereExprs { + if expr.value == nil { + whereConditions = append(whereConditions, fmt.Sprintf("%s IS NULL", quoteIdentifier(expr.name))) + } else { + whereConditions = append(whereConditions, fmt.Sprintf("%s = ?", quoteIdentifier(expr.name))) + values = append(values, expr.value) + } + } + + query := fmt.Sprintf(deleteTemplate, + quoteIdentifier(s.database), + quoteIdentifier(s.schema), + quoteIdentifier(message.Table), + strings.Join(whereConditions, " AND "), + ) + + return query, values, nil +} + +// executeQuery executes the query with proper logging and error handling +func (s *SnowflakeSink) executeQuery(ctx context.Context, tx *sql.Tx, query string, values []interface{}) error { + _, err := tx.ExecContext(ctx, query, values...) + if err != nil { + return fmt.Errorf("query failed: %w\nQuery: %s\nValues: %v", err, query, values) + } + return nil +} + +// handleInsert processes an insert operation +func (s *SnowflakeSink) handleInsert(ctx context.Context, tx *sql.Tx, message *utils.CDCMessage) error { + exprs, err := s.buildColumnExprs(message, false) + if err != nil { + return fmt.Errorf("failed to build column expressions: %w", err) + } + + query, values, err := s.buildInsertQuery(message, exprs) + if err != nil { + return fmt.Errorf("failed to build insert query: %w", err) + } + + return s.executeQuery(ctx, tx, query, values) +} + +// handleUpdate processes an update operation +func (s *SnowflakeSink) handleUpdate(ctx context.Context, tx *sql.Tx, message *utils.CDCMessage) error { + setExprs := make([]columnExpr, 0) + for _, col := range message.Columns { + if message.IsColumnToasted(col.Name) { + continue + } + + value, err := message.GetColumnValue(col.Name, false) + if err != nil { + return fmt.Errorf("failed to get column value: %w", err) + } + + sfValue, err := convertToSnowflakeValue(value, col.DataType) + if err != nil { + return fmt.Errorf("failed to convert value for column %s: %w", col.Name, err) + } + + setExprs = append(setExprs, columnExpr{ + name: col.Name, + value: sfValue, + dataType: col.DataType, + isJSONCol: col.DataType == pgtype.JSONOID || col.DataType == pgtype.JSONBOID, + }) + } + + if len(setExprs) == 0 { + s.logger.Debug().Msg("No columns to update, skipping") + return nil + } + + // For WHERE clause, only use replication key columns + whereExprs := make([]columnExpr, 0) + if !message.ReplicationKey.IsValid() { + return fmt.Errorf("invalid replication key configuration") + } + + for _, colName := range message.ReplicationKey.Columns { + colIndex := message.GetColumnIndex(colName) + if colIndex == -1 { + return fmt.Errorf("replication key column %s not found", colName) + } + + value, err := message.GetColumnValue(colName, true) + if err != nil { + return fmt.Errorf("failed to get column value: %w", err) + } + + sfValue, err := convertToSnowflakeValue(value, message.Columns[colIndex].DataType) + if err != nil { + return fmt.Errorf("failed to convert value for column %s: %w", colName, err) + } + + whereExprs = append(whereExprs, columnExpr{ + name: colName, + value: sfValue, + dataType: message.Columns[colIndex].DataType, + isJSONCol: message.Columns[colIndex].DataType == pgtype.JSONOID || message.Columns[colIndex].DataType == pgtype.JSONBOID, + }) + } + + query, values, err := s.buildUpdateQuery(message, setExprs, whereExprs) + if err != nil { + return fmt.Errorf("failed to build update query: %w", err) + } + + return s.executeQuery(ctx, tx, query, values) +} + +// handleDelete processes a delete operation +func (s *SnowflakeSink) handleDelete(ctx context.Context, tx *sql.Tx, message *utils.CDCMessage) error { + if !message.ReplicationKey.IsValid() { + return fmt.Errorf("invalid replication key configuration") + } + + whereExprs := make([]columnExpr, 0) + for _, colName := range message.ReplicationKey.Columns { + colIndex := message.GetColumnIndex(colName) + if colIndex == -1 { + return fmt.Errorf("replication key column %s not found", colName) + } + + value, err := message.GetColumnValue(colName, true) + if err != nil { + return fmt.Errorf("failed to get column value: %w", err) + } + + sfValue, err := convertToSnowflakeValue(value, message.Columns[colIndex].DataType) + if err != nil { + return fmt.Errorf("failed to convert value for column %s: %w", colName, err) + } + + whereExprs = append(whereExprs, columnExpr{ + name: colName, + value: sfValue, + dataType: message.Columns[colIndex].DataType, + isJSONCol: message.Columns[colIndex].DataType == pgtype.JSONOID || message.Columns[colIndex].DataType == pgtype.JSONBOID, + }) + } + + query, values, err := s.buildDeleteQuery(message, whereExprs) + if err != nil { + return fmt.Errorf("failed to build delete query: %w", err) + } + + return s.executeQuery(ctx, tx, query, values) +} + +// handleDDL processes a DDL operation +func (s *SnowflakeSink) handleDDL(ctx context.Context, tx *sql.Tx, message *utils.CDCMessage) error { + ddlCommand, err := message.GetColumnValue("ddl_command", false) + if err != nil { + return fmt.Errorf("failed to get DDL command: %w", err) + } + + ddlString, ok := ddlCommand.(string) + if !ok { + return fmt.Errorf("DDL command is not a string") + } + + // Convert PostgreSQL DDL to Snowflake compatible DDL + snowflakeDDL, err := convertToSnowflakeDDL(ddlString) + if err != nil { + return fmt.Errorf("failed to convert DDL: %w", err) + } + + if snowflakeDDL == "" { + s.logger.Debug().Msg("Skipping unsupported DDL operation") + return nil + } + + _, err = tx.ExecContext(ctx, snowflakeDDL) + if err != nil { + return fmt.Errorf("failed to execute DDL: %w", err) + } + + return nil +} + +// convertToSnowflakeDDL converts PostgreSQL DDL to Snowflake compatible DDL +func convertToSnowflakeDDL(pgDDL string) (string, error) { + // This is a simplified conversion. You might need to add more cases + // based on your specific DDL requirements + pgDDL = strings.TrimSpace(strings.ToUpper(pgDDL)) + + // Skip certain PostgreSQL-specific operations + if strings.Contains(pgDDL, "CREATE INDEX") || + strings.Contains(pgDDL, "CREATE TRIGGER") || + strings.Contains(pgDDL, "ALTER TABLE ONLY") { + return "", nil + } + + // Convert PostgreSQL types to Snowflake types + ddl := strings.ReplaceAll(pgDDL, "SERIAL", "INTEGER") + ddl = strings.ReplaceAll(ddl, "BIGSERIAL", "BIGINT") + ddl = strings.ReplaceAll(ddl, "TIMESTAMP WITHOUT TIME ZONE", "TIMESTAMP_NTZ") + ddl = strings.ReplaceAll(ddl, "TIMESTAMP WITH TIME ZONE", "TIMESTAMP_TZ") + ddl = strings.ReplaceAll(ddl, "CHARACTER VARYING", "VARCHAR") + ddl = strings.ReplaceAll(ddl, "DOUBLE PRECISION", "DOUBLE") + + return ddl, nil +} + +// convertToSnowflakeValue converts PostgreSQL values to Snowflake-compatible values +func convertToSnowflakeValue(value interface{}, pgType uint32) (interface{}, error) { + if value == nil { + return nil, nil + } + + switch pgType { + case pgtype.JSONOID, pgtype.JSONBOID: + switch v := value.(type) { + case string: + return v, nil + case []byte: + return string(v), nil + default: + jsonBytes, err := json.Marshal(v) + if err != nil { + return nil, fmt.Errorf("failed to marshal JSON value: %w", err) + } + return string(jsonBytes), nil + } + case pgtype.ByteaOID: + switch v := value.(type) { + case []byte: + // Since BINARY_INPUT_FORMAT is set to 'HEX', we just need to provide the hex string + return hex.EncodeToString(v), nil + default: + return nil, fmt.Errorf("unexpected type for bytea: %T", value) + } + case pgtype.TimestampOID, pgtype.TimestamptzOID: + switch v := value.(type) { + case time.Time: + return v.Format("2006-01-02 15:04:05.999999"), nil + default: + return value, nil + } + case pgtype.DateOID: + switch v := value.(type) { + case time.Time: + return v.Format("2006-01-02"), nil + default: + return value, nil + } + case pgtype.BoolOID: + switch v := value.(type) { + case string: + return v == "t" || v == "true", nil + default: + return value, nil + } + case pgtype.NumericOID: + switch v := value.(type) { + case pgtype.Numeric: + // Convert the numeric struct to a string value + val, err := v.Value() + if err != nil { + return nil, fmt.Errorf("failed to convert numeric to string: %w", err) + } + return val, nil + case []uint8: + // Handle raw bytes from Postgres logical replication + strVal := string(v) + if strVal == "NULL" { + return nil, nil + } + return strVal, nil + case string: + return v, nil + case float64: + return strconv.FormatFloat(v, 'f', -1, 64), nil + case int64: + return strconv.FormatInt(v, 10), nil + default: + return nil, fmt.Errorf("unsupported numeric type: %T", value) + } + case pgtype.Int2ArrayOID, pgtype.Int4ArrayOID, pgtype.Int8ArrayOID, + pgtype.TextArrayOID, pgtype.VarcharArrayOID, pgtype.Float4ArrayOID, + pgtype.Float8ArrayOID: + jsonBytes, err := json.Marshal(value) + if err != nil { + return nil, fmt.Errorf("failed to marshal array value: %w", err) + } + return string(jsonBytes), nil + case pgtype.InetOID, pgtype.CIDROID, pgtype.MacaddrOID: + return fmt.Sprintf("%v", value), nil + case pgtype.UUIDOID: + return fmt.Sprintf("%v", value), nil + // Default case for other types + default: + return value, nil + } +} diff --git a/pkg/utils/cdc_encoding.go b/pkg/utils/cdc_encoding.go index 528aeb1..1d69e89 100644 --- a/pkg/utils/cdc_encoding.go +++ b/pkg/utils/cdc_encoding.go @@ -34,7 +34,20 @@ func ConvertToPgCompatibleOutput(value interface{}, oid uint32) ([]byte, error) case pgtype.Float4OID, pgtype.Float8OID: return []byte(strconv.FormatFloat(value.(float64), 'f', -1, 64)), nil case pgtype.NumericOID: - return []byte(fmt.Sprintf("%v", value)), nil + switch v := value.(type) { + case pgtype.Numeric: + val, err := v.Value() + if err != nil { + return nil, fmt.Errorf("failed to convert numeric to string: %w", err) + } + if str, ok := val.(string); ok { + return []byte(str), nil + } + return []byte(fmt.Sprintf("%v", val)), nil + default: + // For direct values during initial copy + return []byte(fmt.Sprintf("%v", value)), nil + } case pgtype.TextOID, pgtype.VarcharOID: return []byte(value.(string)), nil case pgtype.ByteaOID: @@ -46,20 +59,12 @@ func ConvertToPgCompatibleOutput(value interface{}, oid uint32) ([]byte, error) return []byte(value.(time.Time).Format(time.RFC3339Nano)), nil case pgtype.DateOID: return []byte(value.(time.Time).Format("2006-01-02")), nil - case pgtype.JSONOID: - switch v := value.(type) { - case string: - return []byte(v), nil - case []byte: - return v, nil - default: - return nil, fmt.Errorf("unsupported type for JSON data: %T", value) - } - case pgtype.JSONBOID: + case pgtype.JSONOID, pgtype.JSONBOID: if jsonBytes, ok := value.([]byte); ok { return jsonBytes, nil } return json.Marshal(value) + case pgtype.TextArrayOID, pgtype.VarcharArrayOID, pgtype.Int2ArrayOID, pgtype.Int4ArrayOID, pgtype.Int8ArrayOID, pgtype.Float4ArrayOID, pgtype.Float8ArrayOID, pgtype.BoolArrayOID: