-
Notifications
You must be signed in to change notification settings - Fork 43
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
WIP: MongoDB Discover and CDC Sync (#9)
Signed-off-by: Piyush Singariya <[email protected]> Co-authored-by: Piyush Singariya <[email protected]> Co-authored-by: Piyush Singariya <[email protected]>
- Loading branch information
1 parent
9391b05
commit 0b1fdb7
Showing
54 changed files
with
5,315 additions
and
750 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,4 @@ | ||
g5 | ||
.vscode | ||
test | ||
.DS_Store |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
package constants | ||
|
||
const ( | ||
ParquetFileExt = "parquet" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,40 +1,74 @@ | ||
package base | ||
|
||
import ( | ||
"sync" | ||
|
||
"github.com/datazip-inc/olake/protocol" | ||
"github.com/datazip-inc/olake/types" | ||
"github.com/datazip-inc/olake/typeutils" | ||
) | ||
|
||
type Driver struct { | ||
SourceStreams map[string]*types.Stream // locally cached streams; It contains all streams | ||
GroupRead bool // Used in CDC mode | ||
cachedStreams sync.Map // locally cached streams; It contains all streams | ||
CDCSupport bool // Used in CDC mode | ||
} | ||
|
||
func (d *Driver) ChangeStreamSupported() bool { | ||
return d.GroupRead | ||
return d.CDCSupport | ||
} | ||
|
||
// Returns all the possible streams available in the source | ||
func (d *Driver) GetStreams() []*types.Stream { | ||
streams := []*types.Stream{} | ||
d.cachedStreams.Range(func(key, value any) bool { | ||
streams = append(streams, value.(*types.Stream)) | ||
|
||
return true | ||
}) | ||
|
||
return streams | ||
} | ||
|
||
func (d *Driver) AddStream(stream *types.Stream) { | ||
d.cachedStreams.Store(stream.ID(), stream) | ||
} | ||
|
||
func (d *Driver) UpdateState(stream protocol.Stream, data types.Record) error { | ||
func (d *Driver) GetStream(streamID string) (bool, *types.Stream) { | ||
val, found := d.cachedStreams.Load(streamID) | ||
if !found { | ||
return found, nil | ||
} | ||
|
||
return found, val.(*types.Stream) | ||
} | ||
|
||
func (d *Driver) UpdateStateCursor(stream protocol.Stream, data types.Record) error { | ||
datatype, err := stream.Schema().GetType(stream.Cursor()) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if cursorVal, found := data[stream.Cursor()]; found && cursorVal != nil { | ||
// compare with current state | ||
if stream.GetState() != nil { | ||
state, err := typeutils.MaximumOnDataType(datatype, stream.GetState(), cursorVal) | ||
if stream.GetStateCursor() != nil { | ||
state, err := typeutils.MaximumOnDataType(datatype, stream.GetStateCursor(), cursorVal) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
stream.SetState(state) | ||
stream.SetStateCursor(state) | ||
} else { | ||
// directly update | ||
stream.SetState(cursorVal) | ||
stream.SetStateCursor(cursorVal) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func NewBase() *Driver { | ||
return &Driver{ | ||
cachedStreams: sync.Map{}, | ||
CDCSupport: false, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.