From 5693351efe0a6668877e67d5ffa0607f2571bf2e Mon Sep 17 00:00:00 2001 From: hashcode-ankit Date: Wed, 13 Nov 2024 13:36:01 +0530 Subject: [PATCH] feat: discover command running success --- build.sh | 15 ++-- drivers/base/driver.go | 7 ++ drivers/google-sheets/go.mod | 4 +- drivers/hubspot/go.mod | 4 +- drivers/mongodb/examples/config.json | 15 ++++ drivers/mongodb/internal/config.go | 20 ++--- drivers/mongodb/internal/mon.go | 110 ++++++++++++++++----------- drivers/mongodb/main.go | 5 +- drivers/postgres/go.mod | 2 +- drivers/s3/go.mod | 4 +- go.mod | 4 +- go.work | 3 +- go.work.sum | 34 +++++---- types/stream.go | 6 ++ typeutils/datatype.go | 5 ++ 15 files changed, 143 insertions(+), 95 deletions(-) create mode 100644 drivers/mongodb/examples/config.json diff --git a/build.sh b/build.sh index f6219dc..8a6f78e 100755 --- a/build.sh +++ b/build.sh @@ -1,5 +1,5 @@ function fail() { - local error="$*" || 'Unknown error' + local error="${*:-Unknown error}" echo "$(chalk red "${error}")" exit 1 } @@ -15,7 +15,7 @@ function build_and_run() { else fail "The argument does not have a recognized prefix." fi - cd $path + cd $path || fail "Failed to navigate to path: $path" go mod tidy go build -ldflags="-w -s -X constants/constants.version=${GIT_VERSION} -X constants/constants.commitsha=${GIT_COMMITSHA} -X constants/constants.releasechannel=${RELEASE_CHANNEL}" -o g5 main.go @@ -26,9 +26,8 @@ function build_and_run() { if [ $# -gt 0 ]; then argument="$1" - # Capture and join remaining arguments - g5 - remaining_arguments=("$@") + # Capture and join remaining arguments, skipping the first one + remaining_arguments=("${@:2}") joined_arguments=$( IFS=' ' echo "${remaining_arguments[*]}" @@ -37,14 +36,14 @@ if [ $# -gt 0 ]; then if [[ $argument == driver-* ]]; then driver="${argument#driver-}" echo "============================== Building driver: $driver ==============================" - build_and_run "$driver" "driver" $joined_arguments + build_and_run "$driver" "driver" "$joined_arguments" elif [[ $argument == adapter-* ]]; then adapter="${argument#adapter-}" echo "============================== Building adapter: $adapter ==============================" - build_and_run "$adapter" "adapter" $joined_arguments + build_and_run "$adapter" "adapter" "$joined_arguments" else fail "The argument does not have a recognized prefix." fi else fail "No arguments provided." -fi +fi \ No newline at end of file diff --git a/drivers/base/driver.go b/drivers/base/driver.go index 381daca..8a48cfe 100644 --- a/drivers/base/driver.go +++ b/drivers/base/driver.go @@ -38,3 +38,10 @@ func (d *Driver) UpdateState(stream protocol.Stream, data types.Record) error { return nil } + +func NewBase() *Driver { + return &Driver{ + SourceStreams: make(map[string]*types.Stream), + GroupRead: false, + } +} diff --git a/drivers/google-sheets/go.mod b/drivers/google-sheets/go.mod index 643015e..369ed64 100644 --- a/drivers/google-sheets/go.mod +++ b/drivers/google-sheets/go.mod @@ -1,8 +1,6 @@ module github.com/datazip-inc/olake/drivers/google-sheets -go 1.22 - -toolchain go1.22.3 +go 1.23 require ( github.com/datazip-inc/olake v0.0.0-00010101000000-000000000000 diff --git a/drivers/hubspot/go.mod b/drivers/hubspot/go.mod index 0634193..01cbe47 100644 --- a/drivers/hubspot/go.mod +++ b/drivers/hubspot/go.mod @@ -1,8 +1,6 @@ module github.com/datazip-inc/olake/drivers/hubspot -go 1.22 - -toolchain go1.22.3 +go 1.23 require ( github.com/datazip-inc/olake v0.0.0-20230727050722-6795340c7033 diff --git a/drivers/mongodb/examples/config.json b/drivers/mongodb/examples/config.json new file mode 100644 index 0000000..d087395 --- /dev/null +++ b/drivers/mongodb/examples/config.json @@ -0,0 +1,15 @@ +{ + "hosts": [ + "host1:27017", + "host2:27017", + "host3:27017" + ], + "username": "test", + "password": "test", + "authdb": "admin", + "replica-set": "rs0", + "read-preference": "secondaryPreferred", + "srv": true, + "server-ram": 16, + "databsae": "otter_db" +} \ No newline at end of file diff --git a/drivers/mongodb/internal/config.go b/drivers/mongodb/internal/config.go index 0bf23a7..f3f98bd 100644 --- a/drivers/mongodb/internal/config.go +++ b/drivers/mongodb/internal/config.go @@ -6,20 +6,20 @@ import ( ) type Config struct { - Hosts []string - Username string - Password string - AuthDB string - ReplicaSet string - ReadPreference string - Srv bool - ServerRAM uint - Database string + Hosts []string `json:"hosts"` + Username string `json:"username"` + Password string `json:"password"` + AuthDB string `json:"authdb"` + ReplicaSet string `json:"replica-set"` + ReadPreference string `json:"read-preference"` + Srv bool `json:"srv"` + ServerRAM uint `json:"server-ram"` + Database string `json:"databsae"` } func (c *Config) URI() string { return fmt.Sprintf( - "mongodb://%s:%s@%s?authSource=%s&replicaSet=%s&readPreference=%s", + "mongodb://%s:%s@%s/?authSource=%s&replicaSet=%s&readPreference=%s", c.Username, c.Password, strings.Join(c.Hosts, ","), c.AuthDB, c.ReplicaSet, c.ReadPreference, ) } diff --git a/drivers/mongodb/internal/mon.go b/drivers/mongodb/internal/mon.go index f737f90..33d8c8f 100644 --- a/drivers/mongodb/internal/mon.go +++ b/drivers/mongodb/internal/mon.go @@ -11,6 +11,7 @@ import ( "github.com/datazip-inc/olake/protocol" "github.com/datazip-inc/olake/types" "github.com/datazip-inc/olake/typeutils" + "github.com/piyushsingariya/relec" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" @@ -59,48 +60,59 @@ func (m *Mongo) Type() string { } func (m *Mongo) Discover() ([]*types.Stream, error) { + logger.Infof("Starting discover for MongoDB database %s", m.config.Database) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() + // TODO: Check must run before discover command + if m.client == nil { + client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(m.config.URI())) + if err != nil { + return nil, err + } + + m.client = client + } + database := m.client.Database(m.config.Database) + if m == nil || m.SourceStreams == nil { + fmt.Println("m is nil received") + } collections, err := database.ListCollections(ctx, bson.M{}) if err != nil { return nil, err } - // Channel to collect results var streams []*types.Stream - var mu sync.Mutex - var wg sync.WaitGroup - + var streamLock sync.Mutex + var streamNames []string // Iterate through collections and check if they are views for collections.Next(ctx) { var collectionInfo bson.M if err := collections.Decode(&collectionInfo); err != nil { - return nil, fmt.Errorf("failed to decode collection ") + return nil, fmt.Errorf("failed to decode collection: %s", err) } // Check if collection is a view if collectionType, ok := collectionInfo["type"].(string); ok && collectionType == "view" { continue } - wg.Add(1) - go func(colName string) { - defer wg.Done() - stream, err := m.produceCollectionSchema(context.TODO(), database, colName) - if err != nil { - logger.Errorf("failed to process collection[%s]: %s", colName, err) - return - } - mu.Lock() - streams = append(streams, stream) - mu.Unlock() - }(collectionInfo["name"].(string)) + streamNames = append(streamNames, collectionInfo["name"].(string)) } + return streams, relec.Concurrent(context.TODO(), streamNames, len(streamNames), func(ctx context.Context, streamName string, execNumber int) error { + stream, err := m.produceCollectionSchema(context.TODO(), database, streamName) + if err != nil { + return fmt.Errorf("failed to process collection[%s]: %s", streamName, err) + } - wg.Wait() + // cache stream + m.SourceStreams[stream.ID()] = stream - return streams, nil + streamLock.Lock() + streams = append(streams, stream) + streamLock.Unlock() + return nil + }) } func (m *Mongo) Read(pool *protocol.WriterPool, stream protocol.Stream) error { @@ -114,48 +126,54 @@ func (m *Mongo) Read(pool *protocol.WriterPool, stream protocol.Stream) error { return nil } -// fetch records from mongo and schema types -func (m *Mongo) produceCollectionSchema(ctx context.Context, db *mongo.Database, collectionName string) (*types.Stream, error) { - collection := db.Collection(collectionName) - stream := types.NewStream(collectionName, "") +// fetch schema types from mongo for streamName +func (m *Mongo) produceCollectionSchema(ctx context.Context, db *mongo.Database, streamName string) (*types.Stream, error) { + logger.Infof("Producing catalog schema for stream [%s]", streamName) - // default sync modes + // Initialize stream + collection := db.Collection(streamName) + stream := types.NewStream(streamName, "") stream.WithSyncMode(types.CDC, types.FULLREFRESH) - mongoIndexes, err := collection.Indexes().List(ctx, options.ListIndexes()) + + indexesCursor, err := collection.Indexes().List(ctx, options.ListIndexes()) if err != nil { return nil, err } + defer indexesCursor.Close(ctx) - for mongoIndexes.Next(ctx) { + for indexesCursor.Next(ctx) { var indexes bson.M - if err := mongoIndexes.Decode(&indexes); err != nil { + if err := indexesCursor.Decode(&indexes); err != nil { return nil, err } - for key, _ := range indexes["key"].(bson.M) { + for key := range indexes["key"].(bson.M) { stream.WithPrimaryKey(key) - stream.WithSyncMode(types.INCREMENTAL) } } - // iterate over 1k records to get data types - // currently only populating schema on level 1 - cursor, err := collection.Find(ctx, bson.D{}, options.Find().SetLimit(1000)) - if err != nil { - return nil, err + + // Define find options for fetching documents in ascending and descending order. + findOpts := []*options.FindOptions{ + options.Find().SetLimit(100000).SetSort(bson.D{{Key: "$natural", Value: 1}}), + options.Find().SetLimit(100000).SetSort(bson.D{{Key: "$natural", Value: -1}}), } - defer cursor.Close(ctx) - for cursor.Next(ctx) { - var row bson.M - if err := cursor.Decode(&row); err != nil { - return nil, err + return stream, relec.Concurrent(ctx, findOpts, len(findOpts), func(ctx context.Context, findOpt *options.FindOptions, execNumber int) error { + cursor, err := collection.Find(ctx, bson.D{}, findOpt) + if err != nil { + return err } + defer cursor.Close(ctx) - if err := typeutils.Resolve(stream, row); err != nil { - return nil, err - } - } - - m.SourceStreams[stream.ID()] = stream + for cursor.Next(ctx) { + var row bson.M + if err := cursor.Decode(&row); err != nil { + return err + } - return stream, nil + if err := typeutils.Resolve(stream, row); err != nil { + return err + } + } + return nil + }) } diff --git a/drivers/mongodb/main.go b/drivers/mongodb/main.go index ef49e0f..a122718 100644 --- a/drivers/mongodb/main.go +++ b/drivers/mongodb/main.go @@ -2,13 +2,16 @@ package main import ( "github.com/datazip-inc/olake" + "github.com/datazip-inc/olake/drivers/base" driver "github.com/datazip-inc/olake/drivers/mongodb/internal" "github.com/datazip-inc/olake/protocol" _ "github.com/jackc/pgx/v4/stdlib" ) func main() { - driver := &driver.Mongo{} + driver := &driver.Mongo{ + Driver: base.NewBase(), + } defer driver.Close() _ = protocol.BulkDriver(driver) diff --git a/drivers/postgres/go.mod b/drivers/postgres/go.mod index f3689cf..f8dd358 100644 --- a/drivers/postgres/go.mod +++ b/drivers/postgres/go.mod @@ -1,6 +1,6 @@ module github.com/datazip-inc/olake/drivers/postgres -go 1.22 +go 1.23 require ( github.com/lib/pq v1.10.2 diff --git a/drivers/s3/go.mod b/drivers/s3/go.mod index d0e4673..561ad56 100644 --- a/drivers/s3/go.mod +++ b/drivers/s3/go.mod @@ -1,8 +1,6 @@ module github.com/piyushsingariya/drivers/s3 -go 1.22 - -toolchain go1.22.3 +go 1.23 require github.com/datazip-inc/olake v0.0.0-20230727050722-6795340c7033 diff --git a/go.mod b/go.mod index 3c37c5f..b1ca19b 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module github.com/datazip-inc/olake -go 1.22 - -toolchain go1.22.3 +go 1.23 require ( github.com/go-playground/locales v0.14.1 diff --git a/go.work b/go.work index c207869..c548d45 100644 --- a/go.work +++ b/go.work @@ -1,4 +1,4 @@ -go 1.22 +go 1.23 use ( . @@ -6,4 +6,5 @@ use ( ./drivers/hubspot ./drivers/postgres ./drivers/s3 + ./drivers/mongodb ) diff --git a/go.work.sum b/go.work.sum index 781a817..19fa28d 100644 --- a/go.work.sum +++ b/go.work.sum @@ -139,7 +139,6 @@ github.com/Joker/jade v1.1.3/go.mod h1:T+2WLyt7VH6Lp0TRxQrUYEs64nRc83wkMQrfeIQKd github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06/go.mod h1:7erjKLwalezA0k99cWs5L11HWOAPNjdUZ6RxH1BXbbM= github.com/adrg/xdg v0.4.0/go.mod h1:N6ag73EX4wyxeaoeHctc1mas01KZgsj5tYiAIwqJE/E= github.com/alecthomas/participle/v2 v2.1.0/go.mod h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c= -github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 h1:byKBBF2CKWBjjA4J1ZL2JXttJULvWSl50LegTyRZ728= github.com/apache/arrow/go/v12 v12.0.0/go.mod h1:d+tV/eHZZ7Dz7RPrFKtPK02tpr+c9/PEd/zm8mDS9Vg= github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc= github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= @@ -179,6 +178,7 @@ github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SU github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= github.com/go-playground/validator v9.31.0+incompatible h1:UA72EPEogEnq76ehGdEDp4Mit+3FDh548oRqwVgNsHA= github.com/go-playground/validator v9.31.0+incompatible/go.mod h1:yrEkQXlcI+PugkyDjY2bRrL/UBU4f3rvrgkN3V8JEig= @@ -189,7 +189,6 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/golang/glog v1.1.0/go.mod h1:pfYeQZ3JWZoXTV5sFc986z3HTpwQs9At6P4ImfuP3NQ= github.com/gomarkdown/markdown v0.0.0-20231222211730-1d6d20845b47/go.mod h1:JDGcbDT52eL4fju3sZ4TeHGsQwhG9nbDV21aMyhwPoA= github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-pkcs11 v0.2.1-0.20230907215043-c6f79328ddf9/go.mod h1:6eQoGcuNJpa7jnd5pMGdkSaQpNDYvPlXWMcjXXThLlY= github.com/google/pprof v0.0.0-20231101202521-4ca4178f5c7a/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/google/s2a-go v0.1.4/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A= @@ -221,14 +220,11 @@ github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47 github.com/invopop/jsonschema v0.12.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0= github.com/iris-contrib/schema v0.0.6/go.mod h1:iYszG0IOsuIsfzjymw1kMzTL8YQcCWlm65f3wX8J5iA= github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0= -github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= github.com/jackc/pgconn v1.14.0 h1:vrbA9Ud87g6JdFWkHTJXppVce58qPIdP7N8y0Ml/A7Q= github.com/jackc/pgconn v1.14.0/go.mod h1:9mBNlny0UvkgJdCDvdVHYSjI+8tD2rnKK69Wz8ti++E= github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A= github.com/jackc/pgproto3/v2 v2.3.2 h1:7eY55bdBeCz1F2fTzSz69QC+pG46jYq9/jtSPiJ5nn0= github.com/jackc/pgproto3/v2 v2.3.2/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgtype v1.14.0 h1:y+xUdabmyMkJLyApYuPj38mW+aAIqCe5uuBB51rH3Vw= -github.com/jackc/pgtype v1.14.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4= github.com/jackc/pgx/v4 v4.18.1 h1:YP7G1KABtKpB5IHrO9vYwSrCOhs7p3uqhvhhQBptya0= github.com/jackc/pgx/v4 v4.18.1/go.mod h1:FydWkUyadDmdNH/mHnGob881GawxeEm7TcMCzkb+qQE= github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= @@ -249,6 +245,7 @@ github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZY github.com/labstack/echo/v4 v4.11.4/go.mod h1:noh7EvLwqDsmh/X/HWKPUl1AjzJrhyptRyEbQJfxen8= github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= github.com/lib/pq v1.10.4 h1:SO9z7FRPzA03QhHKJrH5BXA6HU1rS4V2nIVrrNC1iYk= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lyft/protoc-gen-star/v2 v2.0.3/go.mod h1:amey7yeodaJhXSbf/TlLvWiqQfLOSpEk//mLlc+axEk= github.com/mailgun/raymond/v2 v2.0.48/go.mod h1:lsgvL50kgt1ylcFJYZiULi5fjPBkkhNfj4KA0W54Z18= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= @@ -273,6 +270,7 @@ github.com/onsi/gomega v1.29.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8P github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= @@ -281,6 +279,7 @@ github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPO github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= +github.com/shirou/gopsutil/v4 v4.24.8/go.mod h1:wE0OrJtj4dG+hYkxqDH3QiBICdKSf04/npcvLLc/oRg= github.com/sirupsen/logrus v1.9.2/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= @@ -299,6 +298,8 @@ github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vl github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= @@ -311,6 +312,7 @@ github.com/yudai/gojsondiff v1.0.0 h1:27cbfqXLVEJ1o8I6v3y9lg8Ydm53EKqHXAOMxEGlCO github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg= github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 h1:BHyfKlQyqbsFN5p3IfnEUduWvb9is428/nNb5L3U01M= github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM= +github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.etcd.io/etcd/api/v3 v3.5.9/go.mod h1:uyAal843mC8uUVSLWz6eHa/d971iDGnCRpmKd2Z+X8k= go.etcd.io/etcd/client/pkg/v3 v3.5.9/go.mod h1:y+CzeSmkMpWN2Jyu1npecjB9BBnABxGM4pN8cGuJeL4= go.etcd.io/etcd/client/v2 v2.305.9/go.mod h1:0NBdNx9wbxtEQLwAQtrDHwx58m02vXpDcgSYI2seohQ= @@ -327,15 +329,15 @@ go.opentelemetry.io/proto/otlp v1.2.0/go.mod h1:gGpR8txAl5M03pDhMC79G6SdqNV26naR golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA= golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g= +golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= +golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= @@ -347,28 +349,28 @@ golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457/go.mod h1:pRgIJT+bRLFKnoM1ldnzKoxTIn14Yxz928LQRYYgIN0= golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= golang.org/x/term v0.14.0/go.mod h1:TySc+nGkYR6qt8km8wUhuFRTVSMIX3XPR58y2lC8vww= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk= -golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= +golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= gonum.org/v1/gonum v0.12.0/go.mod h1:73TDxJfAAHeA8Mk9mf8NlIppyhQNo5GLTcYeqgo2lvY= google.golang.org/api v0.126.0/go.mod h1:mBwVAtz+87bEN6CbA1GtZPDOqY2R5ONPqJeIlvyo4Aw= google.golang.org/api v0.128.0/go.mod h1:Y611qgqaE92On/7g65MQgxYul3c0rEB894kniWLY750= diff --git a/types/stream.go b/types/stream.go index 7534b91..4504b6b 100644 --- a/types/stream.go +++ b/types/stream.go @@ -1,6 +1,8 @@ package types import ( + "sync" + "github.com/goccy/go-json" "github.com/datazip-inc/olake/jsonschema/schema" @@ -9,6 +11,7 @@ import ( // Output Stream Object for dsynk type Stream struct { + sync.Mutex // Name of the Stream Name string `json:"name,omitempty"` // Namespace of the Stream, or Database it belongs to @@ -35,6 +38,7 @@ func NewStream(name, namespace string) *Stream { SupportedSyncModes: NewSet[SyncMode](), SourceDefinedPrimaryKey: NewSet[string](), AvailableCursorFields: NewSet[string](), + Mutex: sync.Mutex{}, } } @@ -68,6 +72,7 @@ func (s *Stream) WithCursorField(columns ...string) *Stream { // Add or Update Column in Stream Type Schema func (s *Stream) UpsertField(column string, typ DataType, nullable bool) { + s.Lock() if s.Schema == nil { s.Schema = &TypeSchema{ Properties: map[string]*Property{}, @@ -83,6 +88,7 @@ func (s *Stream) UpsertField(column string, typ DataType, nullable bool) { } s.Schema.Properties[column] = property + s.Unlock() } func (s *Stream) WithSchema(schema TypeSchema) *Stream { diff --git a/typeutils/datatype.go b/typeutils/datatype.go index b5b81fd..f7716b1 100644 --- a/typeutils/datatype.go +++ b/typeutils/datatype.go @@ -11,6 +11,11 @@ import ( // TypeFromValue return DataType from v type func TypeFromValue(v interface{}) types.DataType { + // null check + if v == nil || (reflect.ValueOf(v).Kind() == reflect.Ptr && reflect.ValueOf(v).IsNil()) { + return types.NULL + } + switch reflect.TypeOf(v).Kind() { case reflect.Invalid: return types.NULL