Skip to content

Commit

Permalink
add recover-forever option
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Sep 18, 2024
1 parent 183abf8 commit e157b9c
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 13 deletions.
11 changes: 11 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ type poptions struct {
backoffResetTime time.Duration
hotStandby bool
recoverAhead bool
recoverForever bool
producerDefaultHeaders Headers

builders struct {
Expand Down Expand Up @@ -274,6 +275,16 @@ func WithRecoverAhead() ProcessorOption {
}
}

// WithRecoverForever configures the processor to recover joins and the processor table forever, without ever joining a group.
// Using this option is highly experimental and is typically used for cluster-migration-scenarios where mirror-maker cannot synchronize
// the consumer-group-offsets.
// If this option is passed, the processor will never actually recover, so `proc.Recovered()` will always return `false`
func WithRecoverForever() ProcessorOption {
return func(o *poptions, gg *GroupGraph) {
o.recoverForever = true
}
}

// WithGroupGraphHook allows a function to obtain the group graph when a processor is started.
func WithGroupGraphHook(hook func(gg *GroupGraph)) ProcessorOption {
return func(o *poptions, gg *GroupGraph) {
Expand Down
23 changes: 21 additions & 2 deletions partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ const (
runModePassive
// the processor only recovers once and then stops. This is used for recover-ahead-option
runModeRecoverOnly
// the processor only recovers forever, never stops. This is used for the RecoverForever option to keep the processor tables up to date
// without actcually processing anything.
runModeRecoverForever
)

type visit struct {
Expand Down Expand Up @@ -211,7 +214,15 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error {
setupErrg.Go(func() error {
pp.log.Debugf("catching up table")
defer pp.log.Debugf("catching up table done")
return pp.table.SetupAndRecover(setupCtx, false)

err := pp.table.SetupAndRecover(setupCtx, false)
if err != nil {
return err
}
if pp.runMode == runModeRecoverForever {
return pp.table.CatchupForever(setupCtx, false)
}
return nil
})
}

Expand All @@ -229,7 +240,15 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error {
pp.joins[join.Topic()] = table

setupErrg.Go(func() error {
return table.SetupAndRecover(setupCtx, false)
err := table.SetupAndRecover(setupCtx, false)
if err != nil {
return err
}

if pp.runMode == runModeRecoverForever {
return table.CatchupForever(setupCtx, false)
}
return nil
})
}

Expand Down
26 changes: 21 additions & 5 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,11 @@ func (g *Processor) Run(ctx context.Context) (rerr error) {
return fmt.Errorf("error waiting for start up tables: %w", err)
}

if ctx.Err() != nil {
g.log.Printf("Shutting down processor before it starts, context was cancelled")
return nil
}

// run the main rebalance-consume-loop
errg.Go(func() error {
return g.rebalanceLoop(ctx)
Expand Down Expand Up @@ -466,17 +471,22 @@ func (g *Processor) waitForStartupTables(ctx context.Context) error {
}
g.mTables.RUnlock()

// If we recover ahead, we'll also start all partition processors once in recover-only-mode
// If we recover ahead (or forever), we'll also start all partition processors once in recover-only-mode (or recover-forever-mode)
// and do the same boilerplate to keep the waitmap up to date.
if g.opts.recoverAhead {
if g.opts.recoverAhead || g.opts.recoverForever {

mode := runModeRecoverOnly
if g.opts.recoverForever {
mode = runModeRecoverForever
}
partitions, err := g.findStatefulPartitions()
if err != nil {
return fmt.Errorf("error finding dependent partitions: %w", err)
}
for _, part := range partitions {
part := part
pproc, err := g.createPartitionProcessor(ctx, part, runModeRecoverOnly, func(msg *message, meta string) {
panic("a partition processor in recover-only-mode never commits a message")
pproc, err := g.createPartitionProcessor(ctx, part, mode, func(msg *message, meta string) {
panic("a partition processor in recover-only-mode/recover-forever-mode never commits a message")
})
if err != nil {
return fmt.Errorf("Error creating partition processor for recover-ahead %s/%d: %v", g.Graph().Group(), part, err)
Expand Down Expand Up @@ -517,7 +527,13 @@ func (g *Processor) waitForStartupTables(ctx context.Context) error {
select {
// the context has closed, no point in waiting
case <-ctx.Done():
g.log.Debugf("Stopping to wait for views to get up, context closed")
g.log.Debugf("Stopping to wait for tables to get up, context closed")

// if we're in recover-forever-mode, this is the normal way of shutdown, so no error here.
if g.opts.recoverForever {
return nil
}

return fmt.Errorf("context closed while waiting for startup tables to become ready")

// the error group is done, which means
Expand Down
143 changes: 137 additions & 6 deletions systemtest/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"os"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -241,9 +242,7 @@ func TestRecoverAhead(t *testing.T) {
return proc2.Run(ctx)
})

pollTimed(t, "procs 1&2 recovered", func() bool {
return true
}, proc1.Recovered, proc2.Recovered)
pollTimed(t, "procs 1&2 recovered", proc1.Recovered, proc2.Recovered)

// check the storages that were initalized by the processors:
// both have each 2 storages, because all tables only have 1 partition
Expand All @@ -259,9 +258,6 @@ func TestRecoverAhead(t *testing.T) {
// wait until the keys are present
pollTimed(t, "key-values are present",

func() bool {
return true
},
func() bool {
has, _ := tableStorage1.Has("key1")
return has
Expand Down Expand Up @@ -297,6 +293,141 @@ func TestRecoverAhead(t *testing.T) {
require.NoError(t, errg.Wait().ErrorOrNil())
}

func TestRecoverForever(t *testing.T) {
brokers := initSystemTest(t)
var (
group = goka.Group(fmt.Sprintf("goka-systemtest-recoverforever-%d", time.Now().Unix()))
inputStream = fmt.Sprintf("%s-input", group)
table = string(goka.GroupTable(group))
)

tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
cfg := goka.DefaultConfig()
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
goka.ReplaceGlobalConfig(cfg)

tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
require.NoError(t, err)

err = tm.EnsureStreamExists(inputStream, 1)
require.NoError(t, err)
err = tm.EnsureTableExists(string(table), 1)
require.NoError(t, err)

// emit something into the state table (like simulating a processor ctx.SetValue()).
// Our test processors should update their value in the join-table
tableEmitter, err := goka.NewEmitter(brokers, goka.Stream(table), new(codec.String))
require.NoError(t, err)
require.NoError(t, tableEmitter.EmitSync("key1", "tableval1"))
require.NoError(t, tableEmitter.Finish())

// emit an input-message
inputEmitter, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.String))
require.NoError(t, err)
require.NoError(t, inputEmitter.EmitSync("key1", "input-value"))
require.NoError(t, inputEmitter.Finish())

storageTracker := newStorageTracker()

var (
processed atomic.Int64
itemsRecovered atomic.Int64
)

createProc := func(recoverForever bool) *goka.Processor {
opts := []goka.ProcessorOption{

goka.WithUpdateCallback(func(ctx goka.UpdateContext, s storage.Storage, key string, value []byte) error {
itemsRecovered.Add(1)
return goka.DefaultUpdate(ctx, s, key, value)
}),
goka.WithStorageBuilder(storageTracker.Build),
}
if recoverForever {
opts = append(opts, goka.WithRecoverForever())
}
proc, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) {
processed.Add(1)
}),
goka.Persist(new(codec.String)),
),
opts...,
)
require.NoError(t, err)
return proc
}

proc1 := createProc(true)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errg, ctx := multierr.NewErrGroup(ctx)

errg.Go(func() error {
return proc1.Run(ctx)
})

pollTimed(t, "procs 1 storage initialized", func() bool {
return len(storageTracker.storages) == 1
})

// get the corresponding storages for both table and join-partitions
tableStorage1 := storageTracker.storages[storageTracker.key(string(table), 0)]

// wait until the keys are present
pollTimed(t, "key-values are present",
func() bool {
has, _ := tableStorage1.Has("key1")
return has
},
)

// check the table-values
val1, _ := tableStorage1.Get("key1")
require.Equal(t, "tableval1", string(val1))

// stop everything and wait until it's shut down
cancel()
require.NoError(t, errg.Wait().ErrorOrNil())

require.EqualValues(t, 0, processed.Load())
require.EqualValues(t, 1, itemsRecovered.Load())

// run processor a second time, without recover forever
itemsRecovered.Store(0)
proc2 := createProc(false)
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
errg, ctx = multierr.NewErrGroup(ctx)
errg.Go(func() error {
return proc2.Run(ctx)
})

pollTimed(t, "procs 2 storage initialized", func() bool {
return len(storageTracker.storages) == 1
})

pollTimed(t, "procs recovered", proc2.Recovered)

// wait until the input is actually processed
pollTimed(t, "input processed", func() bool {
return processed.Load() == 1
})

// at this point we know the processor started, recovered, and consumed the one message in the input-table.
// Now make sure the processor did not recover again (because it did already in the first run)
require.EqualValues(t, 0, itemsRecovered.Load())

// stop everything and wait until it's shut down
cancel()
require.NoError(t, errg.Wait().ErrorOrNil())
}

// TestRebalance runs some processors to test rebalance. It's merely a
// runs-without-errors test, not a real functional test.
func TestRebalance(t *testing.T) {
Expand Down

0 comments on commit e157b9c

Please sign in to comment.