Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mongodb #31

Merged
merged 3 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions build.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
function fail() {
local error="$*" || 'Unknown error'
local error="${*:-Unknown error}"
echo "$(chalk red "${error}")"
exit 1
}
Expand All @@ -15,7 +15,7 @@ function build_and_run() {
else
fail "The argument does not have a recognized prefix."
fi
cd $path
cd $path || fail "Failed to navigate to path: $path"
go mod tidy
go build -ldflags="-w -s -X constants/constants.version=${GIT_VERSION} -X constants/constants.commitsha=${GIT_COMMITSHA} -X constants/constants.releasechannel=${RELEASE_CHANNEL}" -o g5 main.go

Expand All @@ -26,9 +26,8 @@ function build_and_run() {
if [ $# -gt 0 ]; then
argument="$1"

# Capture and join remaining arguments
g5
remaining_arguments=("$@")
# Capture and join remaining arguments, skipping the first one
remaining_arguments=("${@:2}")
joined_arguments=$(
IFS=' '
echo "${remaining_arguments[*]}"
Expand All @@ -37,11 +36,11 @@ if [ $# -gt 0 ]; then
if [[ $argument == driver-* ]]; then
driver="${argument#driver-}"
echo "============================== Building driver: $driver =============================="
build_and_run "$driver" "driver" $joined_arguments
build_and_run "$driver" "driver" "$joined_arguments"
elif [[ $argument == adapter-* ]]; then
adapter="${argument#adapter-}"
echo "============================== Building adapter: $adapter =============================="
build_and_run "$adapter" "adapter" $joined_arguments
build_and_run "$adapter" "adapter" "$joined_arguments"
else
fail "The argument does not have a recognized prefix."
fi
Expand Down
23 changes: 0 additions & 23 deletions connector.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,16 @@
package olake

import (
"fmt"
"os"

"github.com/datazip-inc/olake/logger"
protocol "github.com/datazip-inc/olake/protocol"
"github.com/datazip-inc/olake/safego"
"github.com/spf13/cobra"
)

var (
globalDriver protocol.Driver
globalAdapter protocol.Adapter
)

func RegisterDriver(driver protocol.Driver) {
defer safego.Recovery(true)

if globalAdapter != nil {
logger.Fatal(fmt.Errorf("adapter already registered: %s", globalAdapter.Type()))
}

globalDriver = driver

// Execute the root command
err := protocol.CreateRootCommand(true, driver).Execute()
if err != nil {
Expand All @@ -32,13 +19,3 @@ func RegisterDriver(driver protocol.Driver) {

os.Exit(0)
}

func RegisterAdapter(adapter protocol.Adapter) (*cobra.Command, error) {
if globalDriver != nil {
return nil, fmt.Errorf("driver alraedy registered: %s", globalDriver.Type())
}

globalAdapter = adapter

return protocol.CreateRootCommand(false, adapter), nil
}
2 changes: 1 addition & 1 deletion drivers/base/bulk_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

// Pass dest with all fields initialized to handle nil state case
func ManageGlobalState[T any](state *types.State, dest *T, driver protocol.BulkDriver) error {
func ManageGlobalState[T any](state *types.State, dest *T, driver protocol.ChangeStreamDriver) error {
state.Type = driver.StateType()

if state.Global != nil {
Expand Down
10 changes: 2 additions & 8 deletions drivers/base/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,11 @@
GroupRead bool // Used in CDC mode
}

func NewBase() *Driver {
return &Driver{
SourceStreams: make(map[string]*types.Stream),
}
}

func (d *Driver) BulkRead() bool {
func (d *Driver) ChangeStreamSupported() bool {
return d.GroupRead
}

func (d *Driver) UpdateState(stream protocol.Stream, data types.RecordData) error {
func (d *Driver) UpdateState(stream protocol.Stream, data types.Record) error {
datatype, err := stream.Schema().GetType(stream.Cursor())
if err != nil {
return err
Expand All @@ -29,16 +23,16 @@

if cursorVal, found := data[stream.Cursor()]; found && cursorVal != nil {
// compare with current state
if stream.GetState() != nil {

Check failure on line 26 in drivers/base/driver.go

View workflow job for this annotation

GitHub Actions / GoSec Security Scanner

stream.GetState undefined (type protocol.Stream has no field or method GetState)

Check failure on line 26 in drivers/base/driver.go

View workflow job for this annotation

GitHub Actions / GoSec Security Scanner

stream.GetState undefined (type protocol.Stream has no field or method GetState)
state, err := typeutils.MaximumOnDataType(datatype, stream.GetState(), cursorVal)

Check failure on line 27 in drivers/base/driver.go

View workflow job for this annotation

GitHub Actions / GoSec Security Scanner

stream.GetState undefined (type protocol.Stream has no field or method GetState)

Check failure on line 27 in drivers/base/driver.go

View workflow job for this annotation

GitHub Actions / GoSec Security Scanner

stream.GetState undefined (type protocol.Stream has no field or method GetState)
if err != nil {
return err
}

stream.SetState(state)

Check failure on line 32 in drivers/base/driver.go

View workflow job for this annotation

GitHub Actions / GoSec Security Scanner

stream.SetState undefined (type protocol.Stream has no field or method SetState)

Check failure on line 32 in drivers/base/driver.go

View workflow job for this annotation

GitHub Actions / GoSec Security Scanner

stream.SetState undefined (type protocol.Stream has no field or method SetState)
} else {
// directly update
stream.SetState(cursorVal)

Check failure on line 35 in drivers/base/driver.go

View workflow job for this annotation

GitHub Actions / GoSec Security Scanner

stream.SetState undefined (type protocol.Stream has no field or method SetState)

Check failure on line 35 in drivers/base/driver.go

View workflow job for this annotation

GitHub Actions / GoSec Security Scanner

stream.SetState undefined (type protocol.Stream has no field or method SetState)
}
}

Expand Down
9 changes: 0 additions & 9 deletions drivers/base/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"time"

"github.com/datazip-inc/olake/logger"
"github.com/datazip-inc/olake/types"
)

type basestream interface {
Expand All @@ -24,11 +23,3 @@ func RetryOnFailure(attempts int, sleep *time.Duration, f func() error) (err err

return err
}

func ReformatRecord(stream basestream, record map[string]any) types.Record {
return types.Record{
Stream: stream.Name(),
Namespace: stream.Namespace(),
Data: record,
}
}
13 changes: 8 additions & 5 deletions drivers/google-sheets/go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/datazip-inc/olake/drivers/google-sheets

go 1.22

toolchain go1.22.3
go 1.23

require (
github.com/datazip-inc/olake v0.0.0-00010101000000-000000000000
Expand All @@ -20,18 +18,23 @@ require (
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/go-playground/validator/v10 v10.22.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/joomcode/errorx v1.1.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mitchellh/hashstructure v1.1.0 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/piyushsingariya/relec v0.0.18 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/cobra v1.8.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
20 changes: 14 additions & 6 deletions drivers/google-sheets/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/o
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8=
github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/go-playground/validator/v10 v10.22.1 h1:40JcKH+bBNGFczGuoBYgX4I6m/i27HYW8P9FDk5PbgA=
github.com/go-playground/validator/v10 v10.22.1/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand All @@ -30,6 +30,10 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/joomcode/errorx v1.1.0 h1:dizuSG6yHzlvXOOGHW00gwsmM4Sb9x/yWEfdtPztqcs=
Expand All @@ -40,6 +44,10 @@ github.com/mitchellh/hashstructure v1.1.0 h1:P6P1hdjqAAknpY/M1CGipelZgp+4y9ja9km
github.com/mitchellh/hashstructure v1.1.0/go.mod h1:xUDAozZz0Wmdiufv0uyhnHkUTN6/6d8ulp4AwfLKrmA=
github.com/mozillazg/go-unidecode v0.2.0 h1:vFGEzAH9KSwyWmXCOblazEWDh7fOkpmy/Z4ArmamSUc=
github.com/mozillazg/go-unidecode v0.2.0/go.mod h1:zB48+/Z5toiRolOZy9ksLryJ976VIwmDmpQ2quyt1aA=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/piyushsingariya/relec v0.0.18 h1:7HqJ/1ir9awX8X8FRw+qEWMIeu9DhnVUNYSb4tJa0bU=
github.com/piyushsingariya/relec v0.0.18/go.mod h1:/uSOTyUESjm5ow515VHtwCSopkbBVHGOKwgmiKVwVnA=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand All @@ -65,12 +73,12 @@ golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/oauth2 v0.10.0 h1:zHCpF2Khkwy4mMB4bv0U37YtJdTGW8jI0glAApi0Kh8=
golang.org/x/oauth2 v0.10.0/go.mod h1:kTpgurOux7LqtuxjuyZa4Gj2gdezIt/jQtGnNFfypQI=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
Expand Down
15 changes: 9 additions & 6 deletions drivers/hubspot/go.mod
Original file line number Diff line number Diff line change
@@ -1,33 +1,36 @@
module github.com/datazip-inc/olake/drivers/hubspot

go 1.22

toolchain go1.22.3
go 1.23

require (
github.com/datazip-inc/olake v0.0.0-20230727050722-6795340c7033
github.com/goccy/go-json v0.10.3
golang.org/x/oauth2 v0.11.0
)

require (
github.com/brainicorn/ganno v0.0.0-20220304182003-e638228cd865 // indirect
github.com/brainicorn/goblex v0.0.0-20220304181919-81f017b0ee95 // indirect
github.com/datazip-inc/olake v0.0.0-00010101000000-000000000000
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/go-playground/validator/v10 v10.22.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/joomcode/errorx v1.1.1 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mitchellh/hashstructure v1.1.0 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/piyushsingariya/relec v0.0.18 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/cobra v1.8.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
20 changes: 14 additions & 6 deletions drivers/hubspot/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/o
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8=
github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/go-playground/validator/v10 v10.22.1 h1:40JcKH+bBNGFczGuoBYgX4I6m/i27HYW8P9FDk5PbgA=
github.com/go-playground/validator/v10 v10.22.1/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand All @@ -26,6 +26,10 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/joomcode/errorx v1.1.1 h1:/LFG/qSk1gUTuZjs+qlyOJEpcVjD9DXgBNFhdZkQrjY=
Expand All @@ -34,6 +38,10 @@ github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/mitchellh/hashstructure v1.1.0 h1:P6P1hdjqAAknpY/M1CGipelZgp+4y9ja9kmUZPXP+H0=
github.com/mitchellh/hashstructure v1.1.0/go.mod h1:xUDAozZz0Wmdiufv0uyhnHkUTN6/6d8ulp4AwfLKrmA=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/piyushsingariya/relec v0.0.18 h1:7HqJ/1ir9awX8X8FRw+qEWMIeu9DhnVUNYSb4tJa0bU=
github.com/piyushsingariya/relec v0.0.18/go.mod h1:/uSOTyUESjm5ow515VHtwCSopkbBVHGOKwgmiKVwVnA=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand All @@ -59,12 +67,12 @@ golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/oauth2 v0.11.0 h1:vPL4xzxBM4niKCW6g9whtaWVXTJf1U5e4aZxxFx/gbU=
golang.org/x/oauth2 v0.11.0/go.mod h1:LdF7O/8bLR/qWK9DrpXmbHLTouvRHK0SgJl0GmDBchk=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
Expand Down
18 changes: 9 additions & 9 deletions drivers/hubspot/internal/base_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,13 @@ func (s *Stream) trasformSingleRecord(record map[string]any) map[string]any {
return record
}

func (s *Stream) transform(records []types.RecordData, err error) ([]types.RecordData, error) {
func (s *Stream) transform(records []types.Record, err error) ([]types.Record, error) {
if err != nil {
return nil, err
}

// Preprocess record before emitting
transformed := []types.RecordData{}
transformed := []types.Record{}
for _, record := range records {
record = s.castRecordFieldsIfNeeded(record)
if s.createdAtField != "" && s.updatedAtField != "" && record[s.updatedAtField] == nil {
Expand All @@ -272,8 +272,8 @@ func (s *Stream) transform(records []types.RecordData, err error) ([]types.Recor
return transformed, nil
}

func (s *Stream) filterOldRecords(records []types.RecordData) []map[string]any {
stream := []types.RecordData{}
func (s *Stream) filterOldRecords(records []types.Record) []map[string]any {
stream := []types.Record{}

for _, record := range records {
if uat, found := record[s.updatedAtField]; found {
Expand All @@ -294,7 +294,7 @@ func (s *Stream) filterOldRecords(records []types.RecordData) []map[string]any {
return stream
}

func (s *Stream) flatAssociations(records []types.RecordData) []types.RecordData {
func (s *Stream) flatAssociations(records []types.Record) []types.Record {
// When result has associations we prefer to have it flat, so we transform this

// "associations": {
Expand All @@ -307,7 +307,7 @@ func (s *Stream) flatAssociations(records []types.RecordData) []types.RecordData

// "contacts": [201, 251]

stream := []types.RecordData{}
stream := []types.Record{}
for _, record := range records {
if value, found := record["associations"]; found {
delete(record, "associations")
Expand Down Expand Up @@ -408,8 +408,8 @@ func (s *Stream) handleRequest(request *utils.Request) (int, any, error) {
return statusCode, response, nil
}

func (s *Stream) parseResponse(response interface{}) ([]types.RecordData, error) {
records := []types.RecordData{}
func (s *Stream) parseResponse(response interface{}) ([]types.Record, error) {
records := []types.Record{}
if utils.IsInstance(response, reflect.Map) {
response := response.(map[string]any)
if response["status"] != nil && response["status"] == "error" {
Expand Down Expand Up @@ -451,7 +451,7 @@ func (s *Stream) parseResponse(response interface{}) ([]types.RecordData, error)
return records, nil
}

func (s *Stream) readStreamRecords(nextPageToken map[string]any, f func() (path, method string)) ([]types.RecordData, any, error) {
func (s *Stream) readStreamRecords(nextPageToken map[string]any, f func() (path, method string)) ([]types.Record, any, error) {
// properties = self._property_wrapper
// for chunk in properties.split():
// response = self.handle_request(
Expand Down
2 changes: 1 addition & 1 deletion drivers/hubspot/internal/crm_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *CRMSearchStream) readRecords(send chan<- types.Record) error {
latest_cursor := &time.Time{}

for !paginationComplete {
var records []types.RecordData
var records []types.Record
var rawResponse any
var err error

Expand Down
Loading
Loading