Skip to content

Commit

Permalink
feat: discover command running success
Browse files Browse the repository at this point in the history
  • Loading branch information
hash-data committed Nov 13, 2024
1 parent 6c18ced commit 5693351
Show file tree
Hide file tree
Showing 15 changed files with 143 additions and 95 deletions.
15 changes: 7 additions & 8 deletions build.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
function fail() {
local error="$*" || 'Unknown error'
local error="${*:-Unknown error}"
echo "$(chalk red "${error}")"
exit 1
}
Expand All @@ -15,7 +15,7 @@ function build_and_run() {
else
fail "The argument does not have a recognized prefix."
fi
cd $path
cd $path || fail "Failed to navigate to path: $path"
go mod tidy
go build -ldflags="-w -s -X constants/constants.version=${GIT_VERSION} -X constants/constants.commitsha=${GIT_COMMITSHA} -X constants/constants.releasechannel=${RELEASE_CHANNEL}" -o g5 main.go

Expand All @@ -26,9 +26,8 @@ function build_and_run() {
if [ $# -gt 0 ]; then
argument="$1"

# Capture and join remaining arguments
g5
remaining_arguments=("$@")
# Capture and join remaining arguments, skipping the first one
remaining_arguments=("${@:2}")
joined_arguments=$(
IFS=' '
echo "${remaining_arguments[*]}"
Expand All @@ -37,14 +36,14 @@ if [ $# -gt 0 ]; then
if [[ $argument == driver-* ]]; then
driver="${argument#driver-}"
echo "============================== Building driver: $driver =============================="
build_and_run "$driver" "driver" $joined_arguments
build_and_run "$driver" "driver" "$joined_arguments"
elif [[ $argument == adapter-* ]]; then
adapter="${argument#adapter-}"
echo "============================== Building adapter: $adapter =============================="
build_and_run "$adapter" "adapter" $joined_arguments
build_and_run "$adapter" "adapter" "$joined_arguments"
else
fail "The argument does not have a recognized prefix."
fi
else
fail "No arguments provided."
fi
fi
7 changes: 7 additions & 0 deletions drivers/base/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ func (d *Driver) UpdateState(stream protocol.Stream, data types.Record) error {

return nil
}

func NewBase() *Driver {
return &Driver{
SourceStreams: make(map[string]*types.Stream),
GroupRead: false,
}
}
4 changes: 1 addition & 3 deletions drivers/google-sheets/go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/datazip-inc/olake/drivers/google-sheets

go 1.22

toolchain go1.22.3
go 1.23

require (
github.com/datazip-inc/olake v0.0.0-00010101000000-000000000000
Expand Down
4 changes: 1 addition & 3 deletions drivers/hubspot/go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/datazip-inc/olake/drivers/hubspot

go 1.22

toolchain go1.22.3
go 1.23

require (
github.com/datazip-inc/olake v0.0.0-20230727050722-6795340c7033
Expand Down
15 changes: 15 additions & 0 deletions drivers/mongodb/examples/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"hosts": [
"host1:27017",
"host2:27017",
"host3:27017"
],
"username": "test",
"password": "test",
"authdb": "admin",
"replica-set": "rs0",
"read-preference": "secondaryPreferred",
"srv": true,
"server-ram": 16,
"databsae": "otter_db"
}
20 changes: 10 additions & 10 deletions drivers/mongodb/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ import (
)

type Config struct {
Hosts []string
Username string
Password string
AuthDB string
ReplicaSet string
ReadPreference string
Srv bool
ServerRAM uint
Database string
Hosts []string `json:"hosts"`
Username string `json:"username"`
Password string `json:"password"`
AuthDB string `json:"authdb"`
ReplicaSet string `json:"replica-set"`
ReadPreference string `json:"read-preference"`
Srv bool `json:"srv"`
ServerRAM uint `json:"server-ram"`
Database string `json:"databsae"`
}

func (c *Config) URI() string {
return fmt.Sprintf(
"mongodb://%s:%s@%s?authSource=%s&replicaSet=%s&readPreference=%s",
"mongodb://%s:%s@%s/?authSource=%s&replicaSet=%s&readPreference=%s",
c.Username, c.Password, strings.Join(c.Hosts, ","), c.AuthDB, c.ReplicaSet, c.ReadPreference,
)
}
110 changes: 64 additions & 46 deletions drivers/mongodb/internal/mon.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/datazip-inc/olake/protocol"
"github.com/datazip-inc/olake/types"
"github.com/datazip-inc/olake/typeutils"
"github.com/piyushsingariya/relec"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
Expand Down Expand Up @@ -59,48 +60,59 @@ func (m *Mongo) Type() string {
}

func (m *Mongo) Discover() ([]*types.Stream, error) {
logger.Infof("Starting discover for MongoDB database %s", m.config.Database)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

// TODO: Check must run before discover command
if m.client == nil {
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(m.config.URI()))
if err != nil {
return nil, err
}

m.client = client
}

database := m.client.Database(m.config.Database)
if m == nil || m.SourceStreams == nil {
fmt.Println("m is nil received")
}
collections, err := database.ListCollections(ctx, bson.M{})
if err != nil {
return nil, err
}

// Channel to collect results
var streams []*types.Stream
var mu sync.Mutex
var wg sync.WaitGroup

var streamLock sync.Mutex
var streamNames []string
// Iterate through collections and check if they are views
for collections.Next(ctx) {
var collectionInfo bson.M
if err := collections.Decode(&collectionInfo); err != nil {
return nil, fmt.Errorf("failed to decode collection ")
return nil, fmt.Errorf("failed to decode collection: %s", err)
}

// Check if collection is a view
if collectionType, ok := collectionInfo["type"].(string); ok && collectionType == "view" {
continue
}
wg.Add(1)
go func(colName string) {
defer wg.Done()
stream, err := m.produceCollectionSchema(context.TODO(), database, colName)
if err != nil {
logger.Errorf("failed to process collection[%s]: %s", colName, err)
return
}
mu.Lock()
streams = append(streams, stream)
mu.Unlock()
}(collectionInfo["name"].(string))
streamNames = append(streamNames, collectionInfo["name"].(string))
}
return streams, relec.Concurrent(context.TODO(), streamNames, len(streamNames), func(ctx context.Context, streamName string, execNumber int) error {
stream, err := m.produceCollectionSchema(context.TODO(), database, streamName)
if err != nil {
return fmt.Errorf("failed to process collection[%s]: %s", streamName, err)
}

wg.Wait()
// cache stream
m.SourceStreams[stream.ID()] = stream

return streams, nil
streamLock.Lock()
streams = append(streams, stream)
streamLock.Unlock()
return nil
})
}

func (m *Mongo) Read(pool *protocol.WriterPool, stream protocol.Stream) error {
Expand All @@ -114,48 +126,54 @@ func (m *Mongo) Read(pool *protocol.WriterPool, stream protocol.Stream) error {
return nil
}

// fetch records from mongo and schema types
func (m *Mongo) produceCollectionSchema(ctx context.Context, db *mongo.Database, collectionName string) (*types.Stream, error) {
collection := db.Collection(collectionName)
stream := types.NewStream(collectionName, "")
// fetch schema types from mongo for streamName
func (m *Mongo) produceCollectionSchema(ctx context.Context, db *mongo.Database, streamName string) (*types.Stream, error) {
logger.Infof("Producing catalog schema for stream [%s]", streamName)

// default sync modes
// Initialize stream
collection := db.Collection(streamName)
stream := types.NewStream(streamName, "")
stream.WithSyncMode(types.CDC, types.FULLREFRESH)
mongoIndexes, err := collection.Indexes().List(ctx, options.ListIndexes())

indexesCursor, err := collection.Indexes().List(ctx, options.ListIndexes())
if err != nil {
return nil, err
}
defer indexesCursor.Close(ctx)

for mongoIndexes.Next(ctx) {
for indexesCursor.Next(ctx) {
var indexes bson.M
if err := mongoIndexes.Decode(&indexes); err != nil {
if err := indexesCursor.Decode(&indexes); err != nil {
return nil, err
}
for key, _ := range indexes["key"].(bson.M) {
for key := range indexes["key"].(bson.M) {
stream.WithPrimaryKey(key)
stream.WithSyncMode(types.INCREMENTAL)
}
}
// iterate over 1k records to get data types
// currently only populating schema on level 1
cursor, err := collection.Find(ctx, bson.D{}, options.Find().SetLimit(1000))
if err != nil {
return nil, err

// Define find options for fetching documents in ascending and descending order.
findOpts := []*options.FindOptions{
options.Find().SetLimit(100000).SetSort(bson.D{{Key: "$natural", Value: 1}}),
options.Find().SetLimit(100000).SetSort(bson.D{{Key: "$natural", Value: -1}}),
}
defer cursor.Close(ctx)

for cursor.Next(ctx) {
var row bson.M
if err := cursor.Decode(&row); err != nil {
return nil, err
return stream, relec.Concurrent(ctx, findOpts, len(findOpts), func(ctx context.Context, findOpt *options.FindOptions, execNumber int) error {
cursor, err := collection.Find(ctx, bson.D{}, findOpt)
if err != nil {
return err
}
defer cursor.Close(ctx)

if err := typeutils.Resolve(stream, row); err != nil {
return nil, err
}
}

m.SourceStreams[stream.ID()] = stream
for cursor.Next(ctx) {
var row bson.M
if err := cursor.Decode(&row); err != nil {
return err
}

return stream, nil
if err := typeutils.Resolve(stream, row); err != nil {
return err
}
}
return nil
})
}
5 changes: 4 additions & 1 deletion drivers/mongodb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package main

import (
"github.com/datazip-inc/olake"
"github.com/datazip-inc/olake/drivers/base"
driver "github.com/datazip-inc/olake/drivers/mongodb/internal"
"github.com/datazip-inc/olake/protocol"
_ "github.com/jackc/pgx/v4/stdlib"
)

func main() {
driver := &driver.Mongo{}
driver := &driver.Mongo{
Driver: base.NewBase(),
}
defer driver.Close()

_ = protocol.BulkDriver(driver)
Expand Down
2 changes: 1 addition & 1 deletion drivers/postgres/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/datazip-inc/olake/drivers/postgres

go 1.22
go 1.23

require (
github.com/lib/pq v1.10.2
Expand Down
4 changes: 1 addition & 3 deletions drivers/s3/go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/piyushsingariya/drivers/s3

go 1.22

toolchain go1.22.3
go 1.23

require github.com/datazip-inc/olake v0.0.0-20230727050722-6795340c7033

Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/datazip-inc/olake

go 1.22

toolchain go1.22.3
go 1.23

require (
github.com/go-playground/locales v0.14.1
Expand Down
3 changes: 2 additions & 1 deletion go.work
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
go 1.22
go 1.23

use (
.
./drivers/google-sheets
./drivers/hubspot
./drivers/postgres
./drivers/s3
./drivers/mongodb
)
Loading

0 comments on commit 5693351

Please sign in to comment.