Skip to content

Commit

Permalink
Make callback writer a buffering writer
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyhb committed Sep 20, 2024
1 parent bf1797e commit e405365
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 113 deletions.
8 changes: 5 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ func main() {
panic(err)
}

writer := eventwriter.NewCallbackWriter(ctx, func(cs *changeset.Changeset) {
if cs == nil {
return
writer := eventwriter.NewCallbackWriter(ctx, 1, func(batch []*changeset.Changeset) error {
if len(batch) == 0 || batch[0] == nil {
return nil
}
cs := batch[0]
evt := eventwriter.ChangesetToEvent(*cs)
byt, _ := json.Marshal(evt)
fmt.Println(string(byt))
return nil
})
csChan := writer.Listen(ctx, r)

Expand Down
102 changes: 10 additions & 92 deletions pkg/eventwriter/api_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,14 @@ package eventwriter
import (
"context"
"fmt"
"sync"
"time"

"github.com/inngest/dbcap/pkg/changeset"
"github.com/inngest/inngestgo"
)

func NewAPIClientWriter(
ctx context.Context,
client inngestgo.Client,
batchSize int,
) EventWriter {
cs := make(chan *changeset.Changeset, batchSize)
return &apiWriter{
client: client,
cs: cs,
batchSize: batchSize,
wg: sync.WaitGroup{},
}
}

// ChangesetToEvent returns a map containing event data for the given changeset.
func ChangesetToEvent(cs changeset.Changeset) map[string]any {

var name string

if cs.Data.Table == "" {
Expand All @@ -42,83 +26,17 @@ func ChangesetToEvent(cs changeset.Changeset) map[string]any {
}
}

type apiWriter struct {
client inngestgo.Client
cs chan *changeset.Changeset
batchSize int

wg sync.WaitGroup
}

func (a *apiWriter) Listen(ctx context.Context, committer changeset.WatermarkCommitter) chan *changeset.Changeset {
a.wg.Add(1)
go func() {
defer a.wg.Done()

i := 0
buf := make([]*changeset.Changeset, a.batchSize)

// sendCtx is an additional uncancelled CTX which will be cancelled
// 5 seconds after the
for {
timer := time.NewTimer(batchTimeout)

select {
case <-ctx.Done():
// Shutting down. Send the existing batch.
if err := a.send(buf); err != nil {
// TODO: Fail. What do we do here?
} else {
committer.Commit(buf[i-1].Watermark)
}
return
case <-timer.C:
// Force sending current batch
if i == 0 {
timer.Reset(batchTimeout)
continue
}

// We have events after a timeout - send them.
if err := a.send(buf); err != nil {
// TODO: Fail. What do we do here?
} else {
// Commit the last LSN.
committer.Commit(buf[i-1].Watermark)
}

// reset the buffer
buf = make([]*changeset.Changeset, a.batchSize)
i = 0
case msg := <-a.cs:
if i == a.batchSize {
// send this batch, as we're full.
if err := a.send(buf); err != nil {
// TODO: Fail. What do we do here?
} else {
committer.Commit(buf[i-1].Watermark)
}
// reset the buffer
buf = make([]*changeset.Changeset, a.batchSize)
i = 0
continue
}
// Appoend the
buf[i] = msg
i++
// Send this batch after at least 5 seconds
timer.Reset(batchTimeout)
}
}
}()
return a.cs
}

func (a *apiWriter) Wait() {
a.wg.Wait()
func NewAPIClientWriter(
ctx context.Context,
batchSize int,
client inngestgo.Client,
) EventWriter {
return NewCallbackWriter(ctx, batchSize, func(cs []*changeset.Changeset) error {
return send(client, cs)
})
}

func (a *apiWriter) send(batch []*changeset.Changeset) error {
func send(client inngestgo.Client, batch []*changeset.Changeset) error {
// Always use a new cancel here so that when we quit polling
// the HTTP request continues.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand All @@ -137,6 +55,6 @@ func (a *apiWriter) send(batch []*changeset.Changeset) error {
return nil
}

_, err := a.client.SendMany(ctx, evts)
_, err := client.SendMany(ctx, evts)
return err
}
84 changes: 71 additions & 13 deletions pkg/eventwriter/callback_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,102 @@ package eventwriter

import (
"context"
"sync"
"time"

"github.com/inngest/dbcap/pkg/changeset"
)

// NewCallbackWriter is a simple writer which calls a callback for a given changeset.
//
// This is primarily used for testing.
func NewCallbackWriter(ctx context.Context, onChangeset func(cs *changeset.Changeset)) EventWriter {
cs := make(chan *changeset.Changeset)
func NewCallbackWriter(
ctx context.Context,
batchSize int,
onChangeset func(cs []*changeset.Changeset) error,
) EventWriter {
cs := make(chan *changeset.Changeset, batchSize)
return &cbWriter{
cs: cs,
onChangeset: onChangeset,
cs: cs,
batchSize: batchSize,
wg: sync.WaitGroup{},
}
}

type cbWriter struct {
onChangeset func(cs *changeset.Changeset)
cs chan *changeset.Changeset
onChangeset func([]*changeset.Changeset) error

cs chan *changeset.Changeset
batchSize int

wg sync.WaitGroup
}

func (w *cbWriter) Listen(ctx context.Context, committer changeset.WatermarkCommitter) chan *changeset.Changeset {
func (a *cbWriter) Listen(ctx context.Context, committer changeset.WatermarkCommitter) chan *changeset.Changeset {
a.wg.Add(1)
go func() {
defer a.wg.Done()

i := 0
buf := make([]*changeset.Changeset, a.batchSize)

// sendCtx is an additional uncancelled CTX which will be cancelled
// 5 seconds after the
for {
timer := time.NewTimer(batchTimeout)

select {
case <-ctx.Done():
// Shutting down. Send the existing batch.
if err := a.onChangeset(buf); err != nil {
// TODO: Fail. What do we do here?
} else {
committer.Commit(buf[i-1].Watermark)
}
return
case msg := <-w.cs:
if msg == nil {
case <-timer.C:
// Force sending current batch
if i == 0 {
timer.Reset(batchTimeout)
continue
}
w.onChangeset(msg)
if committer != nil {
committer.Commit(msg.Watermark)

// We have events after a timeout - send them.
if err := a.onChangeset(buf); err != nil {
// TODO: Fail. What do we do here?
} else {
// Commit the last LSN.
committer.Commit(buf[i-1].Watermark)
}

// reset the buffer
buf = make([]*changeset.Changeset, a.batchSize)
i = 0
case msg := <-a.cs:
if i == a.batchSize {
// send this batch, as we're full.
if err := a.onChangeset(buf); err != nil {
// TODO: Fail. What do we do here?
} else {
committer.Commit(buf[i-1].Watermark)
}
// reset the buffer
buf = make([]*changeset.Changeset, a.batchSize)
i = 0
continue
}
// Appoend the
buf[i] = msg
i++
// Send this batch after at least 5 seconds
timer.Reset(batchTimeout)
}
}
}()
return w.cs
return a.cs
}

func (w *cbWriter) Wait() {}
func (a *cbWriter) Wait() {
a.wg.Wait()
}
17 changes: 12 additions & 5 deletions pkg/replicator/pgreplicator/pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ func TestCommit(t *testing.T) {

// Set up event writer which listens to changes
var latestReceivedLSN pglogrepl.LSN
cb := eventwriter.NewCallbackWriter(ctx, func(cs *changeset.Changeset) {
latestReceivedLSN = cs.Watermark.LSN
cb := eventwriter.NewCallbackWriter(ctx, 1, func(cs []*changeset.Changeset) error {
latestReceivedLSN = cs[0].Watermark.LSN
return nil
})
csChan := cb.Listen(ctx, r)
// Star the replicator which forwards to our event writer
Expand Down Expand Up @@ -142,7 +143,8 @@ func TestInsert(t *testing.T) {
inserts int32
)

cb := eventwriter.NewCallbackWriter(ctx, func(cs *changeset.Changeset) {
cb := eventwriter.NewCallbackWriter(ctx, 1, func(batch []*changeset.Changeset) error {
cs := batch[0]
next := atomic.AddInt32(&inserts, 1)

if next == 1 {
Expand Down Expand Up @@ -188,6 +190,7 @@ func TestInsert(t *testing.T) {
cs.Data.New,
)
}
return nil
})
csChan := cb.Listen(ctx, r)

Expand Down Expand Up @@ -242,7 +245,8 @@ func TestUpdateMany_ReplicaIdentityFull(t *testing.T) {
updates int32
)

cb := eventwriter.NewCallbackWriter(ctx, func(cs *changeset.Changeset) {
cb := eventwriter.NewCallbackWriter(ctx, 1, func(batch []*changeset.Changeset) error {
cs := batch[0]
atomic.AddInt32(&total, 1)

if cs.Operation == changeset.OperationUpdate {
Expand Down Expand Up @@ -337,6 +341,7 @@ func TestUpdateMany_ReplicaIdentityFull(t *testing.T) {
case 52:
require.EqualValues(t, changeset.OperationCommit, cs.Operation)
}
return nil
})
csChan := cb.Listen(ctx, r)

Expand Down Expand Up @@ -393,7 +398,8 @@ func TestUpdateMany_DisableReplicaIdentityFull(t *testing.T) {
updates int32
)

cb := eventwriter.NewCallbackWriter(ctx, func(cs *changeset.Changeset) {
cb := eventwriter.NewCallbackWriter(ctx, 1, func(batch []*changeset.Changeset) error {
cs := batch[0]
atomic.AddInt32(&total, 1)

if cs.Operation == changeset.OperationUpdate {
Expand Down Expand Up @@ -455,6 +461,7 @@ func TestUpdateMany_DisableReplicaIdentityFull(t *testing.T) {
case 52:
require.EqualValues(t, changeset.OperationCommit, cs.Operation)
}
return nil
})
csChan := cb.Listen(ctx, r)

Expand Down

0 comments on commit e405365

Please sign in to comment.