Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Writers #6

Merged
merged 14 commits into from
Nov 13, 2024
13 changes: 6 additions & 7 deletions build.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
function fail() {
local error="$*" || 'Unknown error'
local error="${*:-Unknown error}"
echo "$(chalk red "${error}")"
exit 1
}
Expand All @@ -15,7 +15,7 @@ function build_and_run() {
else
fail "The argument does not have a recognized prefix."
fi
cd $path
cd $path || fail "Failed to navigate to path: $path"
go mod tidy
go build -ldflags="-w -s -X constants/constants.version=${GIT_VERSION} -X constants/constants.commitsha=${GIT_COMMITSHA} -X constants/constants.releasechannel=${RELEASE_CHANNEL}" -o g5 main.go

Expand All @@ -26,9 +26,8 @@ function build_and_run() {
if [ $# -gt 0 ]; then
argument="$1"

# Capture and join remaining arguments
g5
remaining_arguments=("$@")
# Capture and join remaining arguments, skipping the first one
remaining_arguments=("${@:2}")
joined_arguments=$(
IFS=' '
echo "${remaining_arguments[*]}"
Expand All @@ -37,11 +36,11 @@ if [ $# -gt 0 ]; then
if [[ $argument == driver-* ]]; then
driver="${argument#driver-}"
echo "============================== Building driver: $driver =============================="
build_and_run "$driver" "driver" $joined_arguments
build_and_run "$driver" "driver" "$joined_arguments"
elif [[ $argument == adapter-* ]]; then
adapter="${argument#adapter-}"
echo "============================== Building adapter: $adapter =============================="
build_and_run "$adapter" "adapter" $joined_arguments
build_and_run "$adapter" "adapter" "$joined_arguments"
else
fail "The argument does not have a recognized prefix."
fi
Expand Down
23 changes: 0 additions & 23 deletions connector.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,16 @@
package olake

import (
"fmt"
"os"

"github.com/datazip-inc/olake/logger"
protocol "github.com/datazip-inc/olake/protocol"
"github.com/datazip-inc/olake/safego"
"github.com/spf13/cobra"
)

var (
globalDriver protocol.Driver
globalAdapter protocol.Adapter
)

func RegisterDriver(driver protocol.Driver) {
defer safego.Recovery(true)

if globalAdapter != nil {
logger.Fatal(fmt.Errorf("adapter already registered: %s", globalAdapter.Type()))
}

globalDriver = driver

// Execute the root command
err := protocol.CreateRootCommand(true, driver).Execute()
if err != nil {
Expand All @@ -32,13 +19,3 @@ func RegisterDriver(driver protocol.Driver) {

os.Exit(0)
}

func RegisterAdapter(adapter protocol.Adapter) (*cobra.Command, error) {
if globalDriver != nil {
return nil, fmt.Errorf("driver alraedy registered: %s", globalDriver.Type())
}

globalAdapter = adapter

return protocol.CreateRootCommand(false, adapter), nil
}
2 changes: 1 addition & 1 deletion drivers/base/bulk_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

// Pass dest with all fields initialized to handle nil state case
func ManageGlobalState[T any](state *types.State, dest *T, driver protocol.BulkDriver) error {
func ManageGlobalState[T any](state *types.State, dest *T, driver protocol.ChangeStreamDriver) error {
state.Type = driver.StateType()

if state.Global != nil {
Expand Down
10 changes: 2 additions & 8 deletions drivers/base/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,11 @@ type Driver struct {
GroupRead bool // Used in CDC mode
}

func NewBase() *Driver {
return &Driver{
SourceStreams: make(map[string]*types.Stream),
}
}

func (d *Driver) BulkRead() bool {
func (d *Driver) ChangeStreamSupported() bool {
return d.GroupRead
}

func (d *Driver) UpdateState(stream protocol.Stream, data types.RecordData) error {
func (d *Driver) UpdateState(stream protocol.Stream, data types.Record) error {
datatype, err := stream.Schema().GetType(stream.Cursor())
if err != nil {
return err
Expand Down
9 changes: 0 additions & 9 deletions drivers/base/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"time"

"github.com/datazip-inc/olake/logger"
"github.com/datazip-inc/olake/types"
)

type basestream interface {
Expand All @@ -24,11 +23,3 @@ func RetryOnFailure(attempts int, sleep *time.Duration, f func() error) (err err

return err
}

func ReformatRecord(stream basestream, record map[string]any) types.Record {
return types.Record{
Stream: stream.Name(),
Namespace: stream.Namespace(),
Data: record,
}
}
13 changes: 8 additions & 5 deletions drivers/google-sheets/go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/datazip-inc/olake/drivers/google-sheets

go 1.22

toolchain go1.22.3
go 1.23

require (
github.com/datazip-inc/olake v0.0.0-00010101000000-000000000000
Expand All @@ -20,18 +18,23 @@ require (
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/go-playground/validator/v10 v10.22.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/joomcode/errorx v1.1.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mitchellh/hashstructure v1.1.0 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/piyushsingariya/relec v0.0.18 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/cobra v1.8.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
20 changes: 14 additions & 6 deletions drivers/google-sheets/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/o
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8=
github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/go-playground/validator/v10 v10.22.1 h1:40JcKH+bBNGFczGuoBYgX4I6m/i27HYW8P9FDk5PbgA=
github.com/go-playground/validator/v10 v10.22.1/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand All @@ -30,6 +30,10 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/joomcode/errorx v1.1.0 h1:dizuSG6yHzlvXOOGHW00gwsmM4Sb9x/yWEfdtPztqcs=
Expand All @@ -40,6 +44,10 @@ github.com/mitchellh/hashstructure v1.1.0 h1:P6P1hdjqAAknpY/M1CGipelZgp+4y9ja9km
github.com/mitchellh/hashstructure v1.1.0/go.mod h1:xUDAozZz0Wmdiufv0uyhnHkUTN6/6d8ulp4AwfLKrmA=
github.com/mozillazg/go-unidecode v0.2.0 h1:vFGEzAH9KSwyWmXCOblazEWDh7fOkpmy/Z4ArmamSUc=
github.com/mozillazg/go-unidecode v0.2.0/go.mod h1:zB48+/Z5toiRolOZy9ksLryJ976VIwmDmpQ2quyt1aA=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/piyushsingariya/relec v0.0.18 h1:7HqJ/1ir9awX8X8FRw+qEWMIeu9DhnVUNYSb4tJa0bU=
github.com/piyushsingariya/relec v0.0.18/go.mod h1:/uSOTyUESjm5ow515VHtwCSopkbBVHGOKwgmiKVwVnA=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand All @@ -65,12 +73,12 @@ golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/oauth2 v0.10.0 h1:zHCpF2Khkwy4mMB4bv0U37YtJdTGW8jI0glAApi0Kh8=
golang.org/x/oauth2 v0.10.0/go.mod h1:kTpgurOux7LqtuxjuyZa4Gj2gdezIt/jQtGnNFfypQI=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
Expand Down
15 changes: 9 additions & 6 deletions drivers/hubspot/go.mod
Original file line number Diff line number Diff line change
@@ -1,33 +1,36 @@
module github.com/datazip-inc/olake/drivers/hubspot

go 1.22

toolchain go1.22.3
go 1.23

require (
github.com/datazip-inc/olake v0.0.0-20230727050722-6795340c7033
github.com/goccy/go-json v0.10.3
golang.org/x/oauth2 v0.11.0
)

require (
github.com/brainicorn/ganno v0.0.0-20220304182003-e638228cd865 // indirect
github.com/brainicorn/goblex v0.0.0-20220304181919-81f017b0ee95 // indirect
github.com/datazip-inc/olake v0.0.0-00010101000000-000000000000
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/go-playground/validator/v10 v10.22.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/joomcode/errorx v1.1.1 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mitchellh/hashstructure v1.1.0 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/piyushsingariya/relec v0.0.18 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/cobra v1.8.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
20 changes: 14 additions & 6 deletions drivers/hubspot/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/o
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8=
github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/go-playground/validator/v10 v10.22.1 h1:40JcKH+bBNGFczGuoBYgX4I6m/i27HYW8P9FDk5PbgA=
github.com/go-playground/validator/v10 v10.22.1/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand All @@ -26,6 +26,10 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/joomcode/errorx v1.1.1 h1:/LFG/qSk1gUTuZjs+qlyOJEpcVjD9DXgBNFhdZkQrjY=
Expand All @@ -34,6 +38,10 @@ github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/mitchellh/hashstructure v1.1.0 h1:P6P1hdjqAAknpY/M1CGipelZgp+4y9ja9kmUZPXP+H0=
github.com/mitchellh/hashstructure v1.1.0/go.mod h1:xUDAozZz0Wmdiufv0uyhnHkUTN6/6d8ulp4AwfLKrmA=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/piyushsingariya/relec v0.0.18 h1:7HqJ/1ir9awX8X8FRw+qEWMIeu9DhnVUNYSb4tJa0bU=
github.com/piyushsingariya/relec v0.0.18/go.mod h1:/uSOTyUESjm5ow515VHtwCSopkbBVHGOKwgmiKVwVnA=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand All @@ -59,12 +67,12 @@ golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/oauth2 v0.11.0 h1:vPL4xzxBM4niKCW6g9whtaWVXTJf1U5e4aZxxFx/gbU=
golang.org/x/oauth2 v0.11.0/go.mod h1:LdF7O/8bLR/qWK9DrpXmbHLTouvRHK0SgJl0GmDBchk=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
Expand Down
18 changes: 9 additions & 9 deletions drivers/hubspot/internal/base_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,13 @@ func (s *Stream) trasformSingleRecord(record map[string]any) map[string]any {
return record
}

func (s *Stream) transform(records []types.RecordData, err error) ([]types.RecordData, error) {
func (s *Stream) transform(records []types.Record, err error) ([]types.Record, error) {
if err != nil {
return nil, err
}

// Preprocess record before emitting
transformed := []types.RecordData{}
transformed := []types.Record{}
for _, record := range records {
record = s.castRecordFieldsIfNeeded(record)
if s.createdAtField != "" && s.updatedAtField != "" && record[s.updatedAtField] == nil {
Expand All @@ -272,8 +272,8 @@ func (s *Stream) transform(records []types.RecordData, err error) ([]types.Recor
return transformed, nil
}

func (s *Stream) filterOldRecords(records []types.RecordData) []map[string]any {
stream := []types.RecordData{}
func (s *Stream) filterOldRecords(records []types.Record) []map[string]any {
stream := []types.Record{}

for _, record := range records {
if uat, found := record[s.updatedAtField]; found {
Expand All @@ -294,7 +294,7 @@ func (s *Stream) filterOldRecords(records []types.RecordData) []map[string]any {
return stream
}

func (s *Stream) flatAssociations(records []types.RecordData) []types.RecordData {
func (s *Stream) flatAssociations(records []types.Record) []types.Record {
// When result has associations we prefer to have it flat, so we transform this

// "associations": {
Expand All @@ -307,7 +307,7 @@ func (s *Stream) flatAssociations(records []types.RecordData) []types.RecordData

// "contacts": [201, 251]

stream := []types.RecordData{}
stream := []types.Record{}
for _, record := range records {
if value, found := record["associations"]; found {
delete(record, "associations")
Expand Down Expand Up @@ -408,8 +408,8 @@ func (s *Stream) handleRequest(request *utils.Request) (int, any, error) {
return statusCode, response, nil
}

func (s *Stream) parseResponse(response interface{}) ([]types.RecordData, error) {
records := []types.RecordData{}
func (s *Stream) parseResponse(response interface{}) ([]types.Record, error) {
records := []types.Record{}
if utils.IsInstance(response, reflect.Map) {
response := response.(map[string]any)
if response["status"] != nil && response["status"] == "error" {
Expand Down Expand Up @@ -451,7 +451,7 @@ func (s *Stream) parseResponse(response interface{}) ([]types.RecordData, error)
return records, nil
}

func (s *Stream) readStreamRecords(nextPageToken map[string]any, f func() (path, method string)) ([]types.RecordData, any, error) {
func (s *Stream) readStreamRecords(nextPageToken map[string]any, f func() (path, method string)) ([]types.Record, any, error) {
// properties = self._property_wrapper
// for chunk in properties.split():
// response = self.handle_request(
Expand Down
2 changes: 1 addition & 1 deletion drivers/hubspot/internal/crm_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *CRMSearchStream) readRecords(send chan<- types.Record) error {
latest_cursor := &time.Time{}

for !paginationComplete {
var records []types.RecordData
var records []types.Record
var rawResponse any
var err error

Expand Down
Loading
Loading