Skip to content

Commit

Permalink
feat: MongoDB Full-Load (#5)
Browse files Browse the repository at this point in the history
Signed-off-by: Piyush Singariya <[email protected]>
Co-authored-by: hashcode-ankit <[email protected]>
  • Loading branch information
piyushdatazip and hash-data authored Nov 13, 2024
1 parent 0421068 commit 2c41cc7
Show file tree
Hide file tree
Showing 52 changed files with 2,327 additions and 579 deletions.
13 changes: 6 additions & 7 deletions build.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
function fail() {
local error="$*" || 'Unknown error'
local error="${*:-Unknown error}"
echo "$(chalk red "${error}")"
exit 1
}
Expand All @@ -15,7 +15,7 @@ function build_and_run() {
else
fail "The argument does not have a recognized prefix."
fi
cd $path
cd $path || fail "Failed to navigate to path: $path"
go mod tidy
go build -ldflags="-w -s -X constants/constants.version=${GIT_VERSION} -X constants/constants.commitsha=${GIT_COMMITSHA} -X constants/constants.releasechannel=${RELEASE_CHANNEL}" -o g5 main.go

Expand All @@ -26,9 +26,8 @@ function build_and_run() {
if [ $# -gt 0 ]; then
argument="$1"

# Capture and join remaining arguments
g5
remaining_arguments=("$@")
# Capture and join remaining arguments, skipping the first one
remaining_arguments=("${@:2}")
joined_arguments=$(
IFS=' '
echo "${remaining_arguments[*]}"
Expand All @@ -37,11 +36,11 @@ if [ $# -gt 0 ]; then
if [[ $argument == driver-* ]]; then
driver="${argument#driver-}"
echo "============================== Building driver: $driver =============================="
build_and_run "$driver" "driver" $joined_arguments
build_and_run "$driver" "driver" "$joined_arguments"
elif [[ $argument == adapter-* ]]; then
adapter="${argument#adapter-}"
echo "============================== Building adapter: $adapter =============================="
build_and_run "$adapter" "adapter" $joined_arguments
build_and_run "$adapter" "adapter" "$joined_arguments"
else
fail "The argument does not have a recognized prefix."
fi
Expand Down
23 changes: 0 additions & 23 deletions connector.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,16 @@
package olake

import (
"fmt"
"os"

"github.com/datazip-inc/olake/logger"
protocol "github.com/datazip-inc/olake/protocol"
"github.com/datazip-inc/olake/safego"
"github.com/spf13/cobra"
)

var (
globalDriver protocol.Driver
globalAdapter protocol.Adapter
)

func RegisterDriver(driver protocol.Driver) {
defer safego.Recovery(true)

if globalAdapter != nil {
logger.Fatal(fmt.Errorf("adapter already registered: %s", globalAdapter.Type()))
}

globalDriver = driver

// Execute the root command
err := protocol.CreateRootCommand(true, driver).Execute()
if err != nil {
Expand All @@ -32,13 +19,3 @@ func RegisterDriver(driver protocol.Driver) {

os.Exit(0)
}

func RegisterAdapter(adapter protocol.Adapter) (*cobra.Command, error) {
if globalDriver != nil {
return nil, fmt.Errorf("driver alraedy registered: %s", globalDriver.Type())
}

globalAdapter = adapter

return protocol.CreateRootCommand(false, adapter), nil
}
2 changes: 1 addition & 1 deletion drivers/base/bulk_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

// Pass dest with all fields initialized to handle nil state case
func ManageGlobalState[T any](state *types.State, dest *T, driver protocol.BulkDriver) error {
func ManageGlobalState[T any](state *types.State, dest *T, driver protocol.ChangeStreamDriver) error {
state.Type = driver.StateType()

if state.Global != nil {
Expand Down
10 changes: 2 additions & 8 deletions drivers/base/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,11 @@ type Driver struct {
GroupRead bool // Used in CDC mode
}

func NewBase() *Driver {
return &Driver{
SourceStreams: make(map[string]*types.Stream),
}
}

func (d *Driver) BulkRead() bool {
func (d *Driver) ChangeStreamSupported() bool {
return d.GroupRead
}

func (d *Driver) UpdateState(stream protocol.Stream, data types.RecordData) error {
func (d *Driver) UpdateState(stream protocol.Stream, data types.Record) error {
datatype, err := stream.Schema().GetType(stream.Cursor())
if err != nil {
return err
Expand Down
9 changes: 0 additions & 9 deletions drivers/base/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"time"

"github.com/datazip-inc/olake/logger"
"github.com/datazip-inc/olake/types"
)

type basestream interface {
Expand All @@ -24,11 +23,3 @@ func RetryOnFailure(attempts int, sleep *time.Duration, f func() error) (err err

return err
}

func ReformatRecord(stream basestream, record map[string]any) types.Record {
return types.Record{
Stream: stream.Name(),
Namespace: stream.Namespace(),
Data: record,
}
}
13 changes: 8 additions & 5 deletions drivers/google-sheets/go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/datazip-inc/olake/drivers/google-sheets

go 1.22

toolchain go1.22.3
go 1.23

require (
github.com/datazip-inc/olake v0.0.0-00010101000000-000000000000
Expand All @@ -20,18 +18,23 @@ require (
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/go-playground/validator/v10 v10.22.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/joomcode/errorx v1.1.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mitchellh/hashstructure v1.1.0 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/piyushsingariya/relec v0.0.18 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/cobra v1.8.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
20 changes: 14 additions & 6 deletions drivers/google-sheets/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/o
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8=
github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/go-playground/validator/v10 v10.22.1 h1:40JcKH+bBNGFczGuoBYgX4I6m/i27HYW8P9FDk5PbgA=
github.com/go-playground/validator/v10 v10.22.1/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand All @@ -30,6 +30,10 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/joomcode/errorx v1.1.0 h1:dizuSG6yHzlvXOOGHW00gwsmM4Sb9x/yWEfdtPztqcs=
Expand All @@ -40,6 +44,10 @@ github.com/mitchellh/hashstructure v1.1.0 h1:P6P1hdjqAAknpY/M1CGipelZgp+4y9ja9km
github.com/mitchellh/hashstructure v1.1.0/go.mod h1:xUDAozZz0Wmdiufv0uyhnHkUTN6/6d8ulp4AwfLKrmA=
github.com/mozillazg/go-unidecode v0.2.0 h1:vFGEzAH9KSwyWmXCOblazEWDh7fOkpmy/Z4ArmamSUc=
github.com/mozillazg/go-unidecode v0.2.0/go.mod h1:zB48+/Z5toiRolOZy9ksLryJ976VIwmDmpQ2quyt1aA=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/piyushsingariya/relec v0.0.18 h1:7HqJ/1ir9awX8X8FRw+qEWMIeu9DhnVUNYSb4tJa0bU=
github.com/piyushsingariya/relec v0.0.18/go.mod h1:/uSOTyUESjm5ow515VHtwCSopkbBVHGOKwgmiKVwVnA=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand All @@ -65,12 +73,12 @@ golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/oauth2 v0.10.0 h1:zHCpF2Khkwy4mMB4bv0U37YtJdTGW8jI0glAApi0Kh8=
golang.org/x/oauth2 v0.10.0/go.mod h1:kTpgurOux7LqtuxjuyZa4Gj2gdezIt/jQtGnNFfypQI=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
Expand Down
15 changes: 9 additions & 6 deletions drivers/hubspot/go.mod
Original file line number Diff line number Diff line change
@@ -1,33 +1,36 @@
module github.com/datazip-inc/olake/drivers/hubspot

go 1.22

toolchain go1.22.3
go 1.23

require (
github.com/datazip-inc/olake v0.0.0-20230727050722-6795340c7033
github.com/goccy/go-json v0.10.3
golang.org/x/oauth2 v0.11.0
)

require (
github.com/brainicorn/ganno v0.0.0-20220304182003-e638228cd865 // indirect
github.com/brainicorn/goblex v0.0.0-20220304181919-81f017b0ee95 // indirect
github.com/datazip-inc/olake v0.0.0-00010101000000-000000000000
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/go-playground/validator/v10 v10.22.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/joomcode/errorx v1.1.1 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mitchellh/hashstructure v1.1.0 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/piyushsingariya/relec v0.0.18 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/cobra v1.8.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
20 changes: 14 additions & 6 deletions drivers/hubspot/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/o
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8=
github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/go-playground/validator/v10 v10.22.1 h1:40JcKH+bBNGFczGuoBYgX4I6m/i27HYW8P9FDk5PbgA=
github.com/go-playground/validator/v10 v10.22.1/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand All @@ -26,6 +26,10 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/joomcode/errorx v1.1.1 h1:/LFG/qSk1gUTuZjs+qlyOJEpcVjD9DXgBNFhdZkQrjY=
Expand All @@ -34,6 +38,10 @@ github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/mitchellh/hashstructure v1.1.0 h1:P6P1hdjqAAknpY/M1CGipelZgp+4y9ja9kmUZPXP+H0=
github.com/mitchellh/hashstructure v1.1.0/go.mod h1:xUDAozZz0Wmdiufv0uyhnHkUTN6/6d8ulp4AwfLKrmA=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/piyushsingariya/relec v0.0.18 h1:7HqJ/1ir9awX8X8FRw+qEWMIeu9DhnVUNYSb4tJa0bU=
github.com/piyushsingariya/relec v0.0.18/go.mod h1:/uSOTyUESjm5ow515VHtwCSopkbBVHGOKwgmiKVwVnA=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand All @@ -59,12 +67,12 @@ golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/oauth2 v0.11.0 h1:vPL4xzxBM4niKCW6g9whtaWVXTJf1U5e4aZxxFx/gbU=
golang.org/x/oauth2 v0.11.0/go.mod h1:LdF7O/8bLR/qWK9DrpXmbHLTouvRHK0SgJl0GmDBchk=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
Expand Down
18 changes: 9 additions & 9 deletions drivers/hubspot/internal/base_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,13 @@ func (s *Stream) trasformSingleRecord(record map[string]any) map[string]any {
return record
}

func (s *Stream) transform(records []types.RecordData, err error) ([]types.RecordData, error) {
func (s *Stream) transform(records []types.Record, err error) ([]types.Record, error) {
if err != nil {
return nil, err
}

// Preprocess record before emitting
transformed := []types.RecordData{}
transformed := []types.Record{}
for _, record := range records {
record = s.castRecordFieldsIfNeeded(record)
if s.createdAtField != "" && s.updatedAtField != "" && record[s.updatedAtField] == nil {
Expand All @@ -272,8 +272,8 @@ func (s *Stream) transform(records []types.RecordData, err error) ([]types.Recor
return transformed, nil
}

func (s *Stream) filterOldRecords(records []types.RecordData) []map[string]any {
stream := []types.RecordData{}
func (s *Stream) filterOldRecords(records []types.Record) []map[string]any {
stream := []types.Record{}

for _, record := range records {
if uat, found := record[s.updatedAtField]; found {
Expand All @@ -294,7 +294,7 @@ func (s *Stream) filterOldRecords(records []types.RecordData) []map[string]any {
return stream
}

func (s *Stream) flatAssociations(records []types.RecordData) []types.RecordData {
func (s *Stream) flatAssociations(records []types.Record) []types.Record {
// When result has associations we prefer to have it flat, so we transform this

// "associations": {
Expand All @@ -307,7 +307,7 @@ func (s *Stream) flatAssociations(records []types.RecordData) []types.RecordData

// "contacts": [201, 251]

stream := []types.RecordData{}
stream := []types.Record{}
for _, record := range records {
if value, found := record["associations"]; found {
delete(record, "associations")
Expand Down Expand Up @@ -408,8 +408,8 @@ func (s *Stream) handleRequest(request *utils.Request) (int, any, error) {
return statusCode, response, nil
}

func (s *Stream) parseResponse(response interface{}) ([]types.RecordData, error) {
records := []types.RecordData{}
func (s *Stream) parseResponse(response interface{}) ([]types.Record, error) {
records := []types.Record{}
if utils.IsInstance(response, reflect.Map) {
response := response.(map[string]any)
if response["status"] != nil && response["status"] == "error" {
Expand Down Expand Up @@ -451,7 +451,7 @@ func (s *Stream) parseResponse(response interface{}) ([]types.RecordData, error)
return records, nil
}

func (s *Stream) readStreamRecords(nextPageToken map[string]any, f func() (path, method string)) ([]types.RecordData, any, error) {
func (s *Stream) readStreamRecords(nextPageToken map[string]any, f func() (path, method string)) ([]types.Record, any, error) {
// properties = self._property_wrapper
// for chunk in properties.split():
// response = self.handle_request(
Expand Down
2 changes: 1 addition & 1 deletion drivers/hubspot/internal/crm_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *CRMSearchStream) readRecords(send chan<- types.Record) error {
latest_cursor := &time.Time{}

for !paginationComplete {
var records []types.RecordData
var records []types.Record
var rawResponse any
var err error

Expand Down
Loading

0 comments on commit 2c41cc7

Please sign in to comment.