Skip to content

Commit

Permalink
elector & locker were failing to send out when not leader (#688)
Browse files Browse the repository at this point in the history
* elector & locker were failing to send out when not leader

* update test to confirm non-active elector/locker are checked

* clean up data race

* try to make test more reliable
  • Loading branch information
JohnRoesler authored Mar 12, 2024
1 parent c2f9575 commit ebec5e9
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 26 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ func main() {
}
```

## Examples

- [Go doc examples](https://pkg.go.dev/github.com/go-co-op/gocron/v2#pkg-examples)
- [Examples directory](examples)

## Concepts

- **Job**: The job encapsulates a "task", which is made up of a go function and any function parameters. The Job then
Expand Down
16 changes: 8 additions & 8 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,23 +516,23 @@ func ExampleWithClock() {
}

func ExampleWithDistributedElector() {
//var _ Elector = (*myElector)(nil)
//var _ gocron.Elector = (*myElector)(nil)
//
//type myElector struct{}
//
//func (m myElector) IsLeader(_ context.Context) error {
// return nil
//}
//
//elector := myElector{}
//elector := &myElector{}
//
//_, _ = NewScheduler(
// WithDistributedElector(elector),
//_, _ = gocron.NewScheduler(
// gocron.WithDistributedElector(elector),
//)
}

func ExampleWithDistributedLocker() {
//var _ Locker = (*myLocker)(nil)
//var _ gocron.Locker = (*myLocker)(nil)
//
//type myLocker struct{}
//
Expand All @@ -549,10 +549,10 @@ func ExampleWithDistributedLocker() {
// return nil
//}
//
//locker := myLocker{}
//locker := &myLocker{}
//
//_, _ = NewScheduler(
// WithDistributedLocker(locker),
//_, _ = gocron.NewScheduler(
// gocron.WithDistributedLocker(locker),
//)
}

Expand Down
73 changes: 73 additions & 0 deletions examples/elector/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package main

import (
"context"
"fmt"
"log"
"time"

"github.com/go-co-op/gocron/v2"
)

var _ gocron.Elector = (*myElector)(nil)

type myElector struct {
num int
leader bool
}

func (m myElector) IsLeader(_ context.Context) error {
if m.leader {
log.Printf("node %d is leader", m.num)
return nil
}
log.Printf("node %d is not leader", m.num)
return fmt.Errorf("not leader")
}

func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)

for i := 0; i < 3; i++ {
go func(i int) {
elector := &myElector{
num: i,
}
if i == 0 {
elector.leader = true
}

scheduler, err := gocron.NewScheduler(
gocron.WithDistributedElector(elector),
)
if err != nil {
log.Println(err)
return
}

_, err = scheduler.NewJob(
gocron.DurationJob(time.Second),
gocron.NewTask(func() {
log.Println("run job")
}),
)

if err != nil {
log.Println(err)
return
}
scheduler.Start()

if i == 0 {
time.Sleep(5 * time.Second)
elector.leader = false
}
if i == 1 {
time.Sleep(5 * time.Second)
elector.leader = true
}
}(i)
}

select {} // wait forever
}
20 changes: 7 additions & 13 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (e *executor) start() {
// complete.
standardJobsWg.Add(1)
go func(j internalJob) {
e.runJob(j, jIn.shouldSendOut)
e.runJob(j, jIn)
standardJobsWg.Done()
}(*j)
}
Expand Down Expand Up @@ -264,7 +264,7 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith
e.limitMode.singletonJobs[jIn.id] = struct{}{}
e.limitMode.singletonJobsMu.Unlock()
}
e.runJob(*j, jIn.shouldSendOut)
e.runJob(*j, jIn)

if j.singletonMode {
e.limitMode.singletonJobsMu.Lock()
Expand Down Expand Up @@ -302,7 +302,7 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup
j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
cancel()
if j != nil {
e.runJob(*j, jIn.shouldSendOut)
e.runJob(*j, jIn)
}

// remove the limiter block to allow another job to be scheduled
Expand All @@ -317,7 +317,7 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup
}
}

func (e *executor) runJob(j internalJob, shouldSendOut bool) {
func (e *executor) runJob(j internalJob, jIn jobIn) {
if j.ctx == nil {
return
}
Expand All @@ -331,26 +331,20 @@ func (e *executor) runJob(j internalJob, shouldSendOut bool) {

if e.elector != nil {
if err := e.elector.IsLeader(j.ctx); err != nil {
e.sendOutToScheduler(&jIn)
return
}
} else if e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.name)
if err != nil {
e.sendOutToScheduler(&jIn)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
}
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)

if shouldSendOut {
select {
case <-e.ctx.Done():
return
case <-j.ctx.Done():
return
case e.jobIDsOut <- j.id:
}
}
e.sendOutToScheduler(&jIn)

startTime := time.Now()
err := callJobFuncWithParams(j.function, j.parameters...)
Expand Down
54 changes: 49 additions & 5 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,7 @@ var _ Elector = (*testElector)(nil)
type testElector struct {
mu sync.Mutex
leaderElected bool
notLeader chan struct{}
}

func (t *testElector) IsLeader(ctx context.Context) error {
Expand All @@ -1152,6 +1153,7 @@ func (t *testElector) IsLeader(ctx context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.leaderElected {
t.notLeader <- struct{}{}
return fmt.Errorf("already elected leader")
}
t.leaderElected = true
Expand All @@ -1163,12 +1165,14 @@ var _ Locker = (*testLocker)(nil)
type testLocker struct {
mu sync.Mutex
jobLocked bool
notLocked chan struct{}
}

func (t *testLocker) Lock(_ context.Context, _ string) (Lock, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.jobLocked {
t.notLocked <- struct{}{}
return nil, fmt.Errorf("job already locked")
}
t.jobLocked = true
Expand All @@ -1184,21 +1188,58 @@ func (t testLock) Unlock(_ context.Context) error {
}

func TestScheduler_WithDistributed(t *testing.T) {
notLocked := make(chan struct{}, 10)
notLeader := make(chan struct{}, 10)

goleak.VerifyNone(t)
tests := []struct {
name string
count int
opt SchedulerOption
name string
count int
opt SchedulerOption
assertions func(*testing.T)
}{
{
"3 schedulers with elector",
3,
WithDistributedElector(&testElector{}),
WithDistributedElector(&testElector{
notLeader: notLeader,
}),
func(t *testing.T) {
timeout := time.Now().Add(1 * time.Second)
var notLeaderCount int
for {
if time.Now().After(timeout) {
break
}
select {
case <-notLeader:
notLeaderCount++
default:
}
}
assert.Equal(t, 2, notLeaderCount)
},
},
{
"3 schedulers with locker",
3,
WithDistributedLocker(&testLocker{}),
WithDistributedLocker(&testLocker{
notLocked: notLocked,
}),
func(t *testing.T) {
timeout := time.Now().Add(1 * time.Second)
var notLockedCount int
for {
if time.Now().After(timeout) {
break
}
select {
case <-notLocked:
notLockedCount++
default:
}
}
},
},
}

Expand All @@ -1222,6 +1263,7 @@ func TestScheduler_WithDistributed(t *testing.T) {
),
NewTask(
func() {
time.Sleep(100 * time.Millisecond)
jobsRan <- struct{}{}
},
),
Expand Down Expand Up @@ -1263,6 +1305,8 @@ func TestScheduler_WithDistributed(t *testing.T) {
}

assert.Equal(t, 1, runCount)
time.Sleep(time.Second)
tt.assertions(t)
})
}
}
Expand Down

0 comments on commit ebec5e9

Please sign in to comment.