Skip to content

Commit

Permalink
Added logic to respawnWriter on schema evolution
Browse files Browse the repository at this point in the history
  • Loading branch information
piyushdatazip committed Nov 7, 2024
1 parent 12861be commit e8d5350
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 14 deletions.
1 change: 1 addition & 0 deletions protocol/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func CreateRootCommand(forDriver bool, driver any) *cobra.Command {
func init() {
commands = append(commands, SpecCmd, CheckCmd, DiscoverCmd, ReadCmd)
RootCmd.PersistentFlags().StringVarP(&config_, "config", "", "", "(Required) Config for connector")
RootCmd.PersistentFlags().StringVarP(&destinationConfig_, "destination", "", "", "(Required) Destination config for connector")
RootCmd.PersistentFlags().StringVarP(&catalog_, "catalog", "", "", "(Required) Catalog for connector")
RootCmd.PersistentFlags().StringVarP(&state_, "state", "", "", "(Required) State for connector")
RootCmd.PersistentFlags().UintVarP(&batchSize_, "batch", "", 10000, "(Optional) Batch size for connector")
Expand Down
40 changes: 28 additions & 12 deletions protocol/writers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var RegisteredWriters = map[types.AdapterType]NewFunc{}
type WriterPool struct {
recordCount atomic.Int64
threadCounter atomic.Int64 // Used in naming files in S3 and global count for threads
config any // respective adapter config
config any // respective writer config
init NewFunc // To initialize exclusive destination threads
group *errgroup.Group
groupCtx context.Context
Expand Down Expand Up @@ -53,48 +53,64 @@ func NewWriter(ctx context.Context, config *types.WriterConfig) (*WriterPool, er
}

// Initialize new adapter thread for writing into destination
func (w *WriterPool) NewThread(ctx context.Context, stream Stream) (chan types.Record, error) {
adapter := w.init()
if err := utils.Unmarshal(w.config, adapter.GetConfigRef()); err != nil {
func (w *WriterPool) NewThread(parent context.Context, stream Stream) (chan types.Record, error) {
thread := w.init()
if err := utils.Unmarshal(w.config, thread.GetConfigRef()); err != nil {
return nil, err
}

if err := adapter.Setup(stream); err != nil {
if err := thread.Setup(stream); err != nil {
return nil, err
}

w.threadCounter.Add(1)
frontend := make(chan types.Record)
backend := make(chan types.Record)
child, childCancel := context.WithCancel(parent)

// spawnWriter spawns a writer process with child context
spawnWriter := func() {
w.group.Go(func() error {
defer thread.Close()
defer w.threadCounter.Add(-1)

return thread.Write(child, backend)
})
}

w.group.Go(func() error {
defer safego.Close(backend)
// not defering canceling the child context so that writing process
// can finish writing all the records pushed into the channel

main:
for {
select {
case <-ctx.Done():
case <-parent.Done():
break main
default:
record, ok := <-frontend
if !ok {
break main
}

// TODO: handle schema evolution here
if thread.ReInitiationRequiredOnSchemaEvolution() {
childCancel() // Close the current writer and spawn new
child, childCancel = context.WithCancel(parent) // replace the original child context and cancel function
spawnWriter() // spawn a writer with newer context
}

w.recordCount.Add(1) // increase the record count
backend <- record
w.logState()
}
}

return nil
})
w.group.Go(func() error {
defer adapter.Close()
defer w.threadCounter.Add(-1)

return adapter.Write(ctx, backend)
})

spawnWriter()
return frontend, nil
}

Expand Down
3 changes: 1 addition & 2 deletions writers/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func (l *Local) Check() error {
}

func (l *Local) Write(ctx context.Context, channel <-chan types.Record) error {
defer l.Close()
iteration:
for {
select {
Expand All @@ -102,7 +101,7 @@ iteration:
}
}

return l.Close()
return nil
}

func (l *Local) ReInitiationRequiredOnSchemaEvolution() bool {
Expand Down

0 comments on commit e8d5350

Please sign in to comment.